Skip to content

Commit 54f142a

Browse files
tadeas-vintrlikmichalvasko
authored andcommitted
session_client FEATURE add locks around message buffer
The message buffers in the client session data were merged into one. A function recv_msg shared by recv_reply and recv_notif was added. It was a critical section so a lock was added.
1 parent b3d0ea5 commit 54f142a

3 files changed

Lines changed: 173 additions & 123 deletions

File tree

src/session.c

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -194,6 +194,8 @@ nc_new_session(NC_SIDE side, int shared_ti)
194194

195195
pthread_mutex_init(&sess->opts.server.ch_lock, NULL);
196196
pthread_cond_init(&sess->opts.server.ch_cond, NULL);
197+
} else {
198+
pthread_mutex_init(&sess->opts.client.msgs_lock, NULL);
197199
}
198200

199201
if (!shared_ti) {
@@ -599,25 +601,21 @@ nc_session_free(struct nc_session *session, void (*data_free)(void *))
599601
}
600602

601603
if (session->side == NC_CLIENT) {
602-
/* cleanup message queues */
603-
/* notifications */
604-
for (contiter = session->opts.client.notifs; contiter; ) {
605-
ly_in_free(contiter->msg, 1);
604+
/* MSGS LOCK */
605+
pthread_mutex_lock(&session->opts.client.msgs_lock);
606606

607-
p = contiter;
608-
contiter = contiter->next;
609-
free(p);
610-
}
611-
612-
/* rpc replies */
613-
for (contiter = session->opts.client.replies; contiter; ) {
607+
/* cleanup message queue*/
608+
for (contiter = session->opts.client.msgs; contiter; ) {
614609
ly_in_free(contiter->msg, 1);
615610

616611
p = contiter;
617612
contiter = contiter->next;
618613
free(p);
619614
}
620615

616+
/* MSGS UNLOCK */
617+
pthread_mutex_unlock(&session->opts.client.msgs_lock);
618+
621619
/* receive any leftover messages */
622620
while (nc_read_msg_poll_io(session, 0, &msg) == 1) {
623621
ly_in_free(msg, 1);
@@ -847,6 +845,8 @@ nc_session_free(struct nc_session *session, void (*data_free)(void *))
847845
/* free CH synchronization structures */
848846
pthread_cond_destroy(&session->opts.server.ch_cond);
849847
pthread_mutex_destroy(&session->opts.server.ch_lock);
848+
} else {
849+
pthread_mutex_destroy(&session->opts.client.msgs_lock);
850850
}
851851

852852
free(session);

src/session_client.c

Lines changed: 156 additions & 107 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
#include <arpa/inet.h>
2222
#include <assert.h>
23+
#include <ctype.h>
2324
#include <errno.h>
2425
#include <fcntl.h>
2526
#include <netdb.h>
@@ -1661,53 +1662,171 @@ recv_reply_check_msgid(struct nc_session *session, const struct lyd_node *envp,
16611662
return NC_MSG_REPLY;
16621663
}
16631664

1665+
/**
1666+
* @brief Used to roughly estimate the type of the message, does not actually parse or verify it.
1667+
*
1668+
* @param[in] session NETCONF session used to send error messages.
1669+
* @param[in] msg Message to check for type.
1670+
* @return NC_MSG_REPLY If format roughly matches a rpc-reply;
1671+
* @return NC_MSG_NOTIF If format roughly matches a notification;
1672+
* @return NC_MSG_ERROR If format is malformed or unrecognized.
1673+
*/
16641674
static NC_MSG_TYPE
1665-
recv_reply(struct nc_session *session, int timeout, struct lyd_node *op, uint64_t msgid, struct lyd_node **envp)
1675+
get_msg_type(struct nc_session *session, struct ly_in *msg)
16661676
{
1667-
int r;
1668-
LY_ERR lyrc;
1677+
const char *str, *end;
1678+
1679+
str = ly_in_memory(msg, NULL);
1680+
1681+
while (*str) {
1682+
/* Skip whitespaces */
1683+
while (isspace(*str)) {
1684+
str++;
1685+
}
1686+
1687+
if (*str == '<') {
1688+
str++;
1689+
if (!strncmp(str, "!--", 3)) {
1690+
/* Skip comments */
1691+
end = "-->";
1692+
str = strstr(str, end);
1693+
} else if (!strncmp(str, "?xml", 4)) {
1694+
/* Skip xml declaration */
1695+
end = "?>";
1696+
str = strstr(str, end);
1697+
} else if (!strncmp(str, "rpc-reply", 9)) {
1698+
return NC_MSG_REPLY;
1699+
} else if (!strncmp(str, "notification", 12)) {
1700+
return NC_MSG_NOTIF;
1701+
} else {
1702+
ERR(session, "Unknown xml element '%.10s'.", str);
1703+
return NC_MSG_ERROR;
1704+
}
1705+
if (!str) {
1706+
/* No matching ending tag found */
1707+
ERR(session, "No matching ending tag '%s' found in xml message.", end);
1708+
return NC_MSG_ERROR;
1709+
}
1710+
str += strlen(end);
1711+
} else {
1712+
/* Not a valid xml */
1713+
ERR(session, "Unexpected character '%c' in xml message.", *str);
1714+
return NC_MSG_ERROR;
1715+
}
1716+
}
1717+
1718+
/* Unexpected end of message */
1719+
ERR(session, "Unexpected end of xml message.");
1720+
return NC_MSG_ERROR;
1721+
}
1722+
1723+
/**
1724+
* @brief Function to receive either replies or notifications.
1725+
*
1726+
* @param[in] session NETCONF session from which this function receives messages.
1727+
* @param[in] timeout Timeout for reading in milliseconds. Use negative value for infinite.
1728+
* @param[in] expected Type of the message the caller desired.
1729+
* @param[out] message If receiving a message succeeded this is the message, NULL otherwise.
1730+
* @return NC_MSG_REPLY If a rpc-reply was received;
1731+
* @return NC_MSG_NOTIF If a notification was received;
1732+
* @return NC_MSG_ERROR If any error occured;
1733+
* @return NC_MSG_WOULDBLOCK If the timeout was reached.
1734+
*/
1735+
static NC_MSG_TYPE
1736+
recv_msg(struct nc_session *session, int timeout, NC_MSG_TYPE expected, struct ly_in **message)
1737+
{
1738+
struct nc_msg_cont **cont_ptr;
16691739
struct ly_in *msg = NULL;
1670-
struct nc_msg_cont *cont, **cont_ptr;
1740+
struct nc_msg_cont *cont, *prev;
16711741
NC_MSG_TYPE ret = NC_MSG_ERROR;
1742+
int r;
16721743

1673-
assert(op && (op->schema->nodetype & (LYS_RPC | LYS_ACTION)));
1674-
1675-
*envp = NULL;
1744+
*message = NULL;
16761745

1677-
/* try to get rpc-reply from the session's queue */
1678-
while (session->opts.client.replies) {
1679-
cont = session->opts.client.replies;
1680-
session->opts.client.replies = cont->next;
1746+
/* MSGS LOCK */
1747+
pthread_mutex_lock(&session->opts.client.msgs_lock);
16811748

1682-
msg = cont->msg;
1683-
free(cont);
1749+
/* Find the expected message in the buffer */
1750+
prev = NULL;
1751+
for (cont = session->opts.client.msgs; cont && cont->type != expected; cont = cont->next) {
1752+
prev = cont;
1753+
}
16841754

1685-
/* parse */
1686-
lyrc = lyd_parse_op(NULL, op, msg, LYD_XML, LYD_TYPE_REPLY_NETCONF, envp, NULL);
1687-
if (!lyrc) {
1688-
ret = recv_reply_check_msgid(session, *envp, msgid);
1689-
goto cleanup;
1690-
} else if (lyrc != LY_ENOT) {
1691-
lyd_free_tree(*envp);
1692-
*envp = NULL;
1693-
ERR(session, "Received an invalid message (%s).", ly_errmsg(LYD_CTX(op)));
1694-
goto cleanup;
1755+
if (cont) {
1756+
/* Remove found message from buffer */
1757+
if (prev) {
1758+
prev->next = cont->next;
16951759
} else {
1696-
/* it was not a notification so it is nothing known */
1697-
ERR(session, "Received an unexpected message.");
1760+
session->opts.client.msgs = cont->next;
16981761
}
16991762

1700-
/* try the next message */
1701-
ly_in_free(msg, 1);
1702-
msg = NULL;
1763+
/* Use the buffer message */
1764+
ret = cont->type;
1765+
msg = cont->msg;
1766+
free(cont);
1767+
goto cleanup;
17031768
}
17041769

1705-
/* read message from wire */
1770+
/* Read a message from the wire */
17061771
r = nc_read_msg_poll_io(session, timeout, &msg);
17071772
if (!r) {
17081773
ret = NC_MSG_WOULDBLOCK;
17091774
goto cleanup;
17101775
} else if (r == -1) {
1776+
ret = NC_MSG_ERROR;
1777+
goto cleanup;
1778+
}
1779+
1780+
/* Basic check to determine message type */
1781+
ret = get_msg_type(session, msg);
1782+
if (ret == NC_MSG_ERROR) {
1783+
goto cleanup;
1784+
}
1785+
1786+
/* If received a message of different type store it in the buffer */
1787+
if (ret != expected) {
1788+
cont_ptr = &session->opts.client.msgs;
1789+
while (*cont_ptr) {
1790+
cont_ptr = &((*cont_ptr)->next);
1791+
}
1792+
*cont_ptr = malloc(sizeof **cont_ptr);
1793+
if (!*cont_ptr) {
1794+
ERRMEM;
1795+
ret = NC_MSG_ERROR;
1796+
goto cleanup;
1797+
}
1798+
(*cont_ptr)->msg = msg;
1799+
(*cont_ptr)->type = ret;
1800+
msg = NULL;
1801+
(*cont_ptr)->next = NULL;
1802+
}
1803+
1804+
cleanup:
1805+
/* MSGS UNLOCK */
1806+
pthread_mutex_unlock(&session->opts.client.msgs_lock);
1807+
1808+
if (ret == expected) {
1809+
*message = msg;
1810+
} else {
1811+
ly_in_free(msg, 1);
1812+
}
1813+
return ret;
1814+
}
1815+
1816+
static NC_MSG_TYPE
1817+
recv_reply(struct nc_session *session, int timeout, struct lyd_node *op, uint64_t msgid, struct lyd_node **envp)
1818+
{
1819+
LY_ERR lyrc;
1820+
struct ly_in *msg = NULL;
1821+
NC_MSG_TYPE ret = NC_MSG_ERROR;
1822+
1823+
assert(op && (op->schema->nodetype & (LYS_RPC | LYS_ACTION)));
1824+
1825+
*envp = NULL;
1826+
1827+
/* Receive messages until a rpc-reply is found or a timeout or error reached */
1828+
ret = recv_msg(session, timeout, NC_MSG_REPLY, &msg);
1829+
if (ret != NC_MSG_REPLY) {
17111830
goto cleanup;
17121831
}
17131832

@@ -1716,30 +1835,12 @@ recv_reply(struct nc_session *session, int timeout, struct lyd_node *op, uint64_
17161835
if (!lyrc) {
17171836
ret = recv_reply_check_msgid(session, *envp, msgid);
17181837
goto cleanup;
1719-
} else if (lyrc != LY_ENOT) {
1720-
lyd_free_tree(*envp);
1721-
*envp = NULL;
1838+
} else {
17221839
ERR(session, "Received an invalid message (%s).", ly_errmsg(LYD_CTX(op)));
1840+
ret = NC_MSG_ERROR;
17231841
goto cleanup;
17241842
}
17251843

1726-
/* assume a notification, reset and store it */
1727-
ly_in_reset(msg);
1728-
cont_ptr = &session->opts.client.notifs;
1729-
while (*cont_ptr) {
1730-
cont_ptr = &((*cont_ptr)->next);
1731-
}
1732-
*cont_ptr = malloc(sizeof **cont_ptr);
1733-
if (!*cont_ptr) {
1734-
ERRMEM;
1735-
goto cleanup;
1736-
}
1737-
(*cont_ptr)->msg = msg;
1738-
msg = NULL;
1739-
(*cont_ptr)->next = NULL;
1740-
1741-
ret = NC_MSG_NOTIF;
1742-
17431844
cleanup:
17441845
ly_in_free(msg, 1);
17451846
return ret;
@@ -1951,81 +2052,29 @@ nc_recv_reply(struct nc_session *session, struct nc_rpc *rpc, uint64_t msgid, in
19512052
static NC_MSG_TYPE
19522053
recv_notif(struct nc_session *session, int timeout, struct lyd_node **envp, struct lyd_node **op)
19532054
{
1954-
int r;
19552055
LY_ERR lyrc;
19562056
struct ly_in *msg = NULL;
1957-
struct nc_msg_cont *cont, **cont_ptr;
19582057
NC_MSG_TYPE ret = NC_MSG_ERROR;
19592058

19602059
*op = NULL;
19612060
*envp = NULL;
19622061

1963-
/* try to get notification from the session's queue */
1964-
while (session->opts.client.notifs) {
1965-
cont = session->opts.client.notifs;
1966-
session->opts.client.notifs = cont->next;
1967-
1968-
msg = cont->msg;
1969-
free(cont);
1970-
1971-
/* parse */
1972-
lyrc = lyd_parse_op(session->ctx, NULL, msg, LYD_XML, LYD_TYPE_NOTIF_NETCONF, envp, op);
1973-
if (!lyrc) {
1974-
ret = NC_MSG_NOTIF;
1975-
goto cleanup;
1976-
} else if (lyrc != LY_ENOT) {
1977-
lyd_free_tree(*envp);
1978-
*envp = NULL;
1979-
ERR(session, "Received an invalid message (%s).", ly_errmsg(session->ctx));
1980-
goto cleanup;
1981-
} else {
1982-
/* it was not a rpc-reply so it is nothing known */
1983-
ERR(session, "Received an unexpected message.");
1984-
}
1985-
1986-
/* try the next message */
1987-
ly_in_free(msg, 1);
1988-
msg = NULL;
1989-
}
1990-
1991-
/* read message from wire */
1992-
r = nc_read_msg_poll_io(session, timeout, &msg);
1993-
if (!r) {
1994-
ret = NC_MSG_WOULDBLOCK;
1995-
goto cleanup;
1996-
} else if (r == -1) {
2062+
/* Receive messages until a notification is found or a timeout or error reached */
2063+
ret = recv_msg(session, timeout, NC_MSG_NOTIF, &msg);
2064+
if (ret != NC_MSG_NOTIF) {
19972065
goto cleanup;
19982066
}
19992067

2000-
/* parse */
2068+
/* Parse */
20012069
lyrc = lyd_parse_op(session->ctx, NULL, msg, LYD_XML, LYD_TYPE_NOTIF_NETCONF, envp, op);
20022070
if (!lyrc) {
2003-
ret = NC_MSG_NOTIF;
20042071
goto cleanup;
2005-
} else if (lyrc != LY_ENOT) {
2006-
lyd_free_tree(*envp);
2007-
*envp = NULL;
2072+
} else {
20082073
ERR(session, "Received an invalid message (%s).", ly_errmsg(session->ctx));
2074+
ret = NC_MSG_ERROR;
20092075
goto cleanup;
20102076
}
20112077

2012-
/* assume a rpc-reply, reset and store it */
2013-
ly_in_reset(msg);
2014-
cont_ptr = &session->opts.client.replies;
2015-
while (*cont_ptr) {
2016-
cont_ptr = &((*cont_ptr)->next);
2017-
}
2018-
*cont_ptr = malloc(sizeof **cont_ptr);
2019-
if (!*cont_ptr) {
2020-
ERRMEM;
2021-
goto cleanup;
2022-
}
2023-
(*cont_ptr)->msg = msg;
2024-
msg = NULL;
2025-
(*cont_ptr)->next = NULL;
2026-
2027-
ret = NC_MSG_REPLY;
2028-
20292078
cleanup:
20302079
ly_in_free(msg, 1);
20312080
return ret;

0 commit comments

Comments
 (0)