patch 8.0.0107
Problem: When reading channel output in a timer, messages may go missing.
(Skywind)
Solution: Add the "drop" option. Write error messages in the channel log.
Don't have ch_canread() check for the channel being open.
This commit is contained in:
@ -155,7 +155,13 @@ Use |ch_status()| to see if the channel could be opened.
|
|||||||
func MyCloseHandler(channel)
|
func MyCloseHandler(channel)
|
||||||
< Vim will invoke callbacks that handle data before invoking
|
< Vim will invoke callbacks that handle data before invoking
|
||||||
close_cb, thus when this function is called no more data will
|
close_cb, thus when this function is called no more data will
|
||||||
be received.
|
be passed to the callbacks.
|
||||||
|
*channel-drop*
|
||||||
|
"drop" Specifies when to drop messages:
|
||||||
|
"auto" When there is no callback to handle a message.
|
||||||
|
The "close_cb" is also considered for this.
|
||||||
|
"never" All messages will be kept.
|
||||||
|
|
||||||
*waittime*
|
*waittime*
|
||||||
"waittime" The time to wait for the connection to be made in
|
"waittime" The time to wait for the connection to be made in
|
||||||
milliseconds. A negative number waits forever.
|
milliseconds. A negative number waits forever.
|
||||||
|
|||||||
123
src/channel.c
123
src/channel.c
@ -1195,6 +1195,7 @@ channel_set_options(channel_T *channel, jobopt_T *opt)
|
|||||||
if (opt->jo_set & JO_CLOSE_CALLBACK)
|
if (opt->jo_set & JO_CLOSE_CALLBACK)
|
||||||
set_callback(&channel->ch_close_cb, &channel->ch_close_partial,
|
set_callback(&channel->ch_close_cb, &channel->ch_close_partial,
|
||||||
opt->jo_close_cb, opt->jo_close_partial);
|
opt->jo_close_cb, opt->jo_close_partial);
|
||||||
|
channel->ch_drop_never = opt->jo_drop_never;
|
||||||
|
|
||||||
if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER)
|
if ((opt->jo_set & JO_OUT_IO) && opt->jo_io[PART_OUT] == JIO_BUFFER)
|
||||||
{
|
{
|
||||||
@ -1918,6 +1919,7 @@ channel_parse_json(channel_T *channel, ch_part_T part)
|
|||||||
clear_tv(&listtv);
|
clear_tv(&listtv);
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
item->jq_no_callback = FALSE;
|
||||||
item->jq_value = alloc_tv();
|
item->jq_value = alloc_tv();
|
||||||
if (item->jq_value == NULL)
|
if (item->jq_value == NULL)
|
||||||
{
|
{
|
||||||
@ -2050,11 +2052,17 @@ remove_json_node(jsonq_T *head, jsonq_T *node)
|
|||||||
* When "id" is positive it must match the first number in the list.
|
* When "id" is positive it must match the first number in the list.
|
||||||
* When "id" is zero or negative jut get the first message. But not the one
|
* When "id" is zero or negative jut get the first message. But not the one
|
||||||
* with id ch_block_id.
|
* with id ch_block_id.
|
||||||
|
* When "without_callback" is TRUE also get messages that were pushed back.
|
||||||
* Return OK when found and return the value in "rettv".
|
* Return OK when found and return the value in "rettv".
|
||||||
* Return FAIL otherwise.
|
* Return FAIL otherwise.
|
||||||
*/
|
*/
|
||||||
static int
|
static int
|
||||||
channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
|
channel_get_json(
|
||||||
|
channel_T *channel,
|
||||||
|
ch_part_T part,
|
||||||
|
int id,
|
||||||
|
int without_callback,
|
||||||
|
typval_T **rettv)
|
||||||
{
|
{
|
||||||
jsonq_T *head = &channel->ch_part[part].ch_json_head;
|
jsonq_T *head = &channel->ch_part[part].ch_json_head;
|
||||||
jsonq_T *item = head->jq_next;
|
jsonq_T *item = head->jq_next;
|
||||||
@ -2064,10 +2072,11 @@ channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
|
|||||||
list_T *l = item->jq_value->vval.v_list;
|
list_T *l = item->jq_value->vval.v_list;
|
||||||
typval_T *tv = &l->lv_first->li_tv;
|
typval_T *tv = &l->lv_first->li_tv;
|
||||||
|
|
||||||
if ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
|
if ((without_callback || !item->jq_no_callback)
|
||||||
|
&& ((id > 0 && tv->v_type == VAR_NUMBER && tv->vval.v_number == id)
|
||||||
|| (id <= 0 && (tv->v_type != VAR_NUMBER
|
|| (id <= 0 && (tv->v_type != VAR_NUMBER
|
||||||
|| tv->vval.v_number == 0
|
|| tv->vval.v_number == 0
|
||||||
|| tv->vval.v_number != channel->ch_part[part].ch_block_id)))
|
|| tv->vval.v_number != channel->ch_part[part].ch_block_id))))
|
||||||
{
|
{
|
||||||
*rettv = item->jq_value;
|
*rettv = item->jq_value;
|
||||||
if (tv->v_type == VAR_NUMBER)
|
if (tv->v_type == VAR_NUMBER)
|
||||||
@ -2080,6 +2089,65 @@ channel_get_json(channel_T *channel, ch_part_T part, int id, typval_T **rettv)
|
|||||||
return FAIL;
|
return FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Put back "rettv" into the JSON queue, there was no callback for it.
|
||||||
|
* Takes over the values in "rettv".
|
||||||
|
*/
|
||||||
|
static void
|
||||||
|
channel_push_json(channel_T *channel, ch_part_T part, typval_T *rettv)
|
||||||
|
{
|
||||||
|
jsonq_T *head = &channel->ch_part[part].ch_json_head;
|
||||||
|
jsonq_T *item = head->jq_next;
|
||||||
|
jsonq_T *newitem;
|
||||||
|
|
||||||
|
if (head->jq_prev != NULL && head->jq_prev->jq_no_callback)
|
||||||
|
/* last item was pushed back, append to the end */
|
||||||
|
item = NULL;
|
||||||
|
else while (item != NULL && item->jq_no_callback)
|
||||||
|
/* append after the last item that was pushed back */
|
||||||
|
item = item->jq_next;
|
||||||
|
|
||||||
|
newitem = (jsonq_T *)alloc((unsigned)sizeof(jsonq_T));
|
||||||
|
if (newitem == NULL)
|
||||||
|
clear_tv(rettv);
|
||||||
|
else
|
||||||
|
{
|
||||||
|
newitem->jq_value = alloc_tv();
|
||||||
|
if (newitem->jq_value == NULL)
|
||||||
|
{
|
||||||
|
vim_free(newitem);
|
||||||
|
clear_tv(rettv);
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
newitem->jq_no_callback = FALSE;
|
||||||
|
*newitem->jq_value = *rettv;
|
||||||
|
if (item == NULL)
|
||||||
|
{
|
||||||
|
/* append to the end */
|
||||||
|
newitem->jq_prev = head->jq_prev;
|
||||||
|
head->jq_prev = newitem;
|
||||||
|
newitem->jq_next = NULL;
|
||||||
|
if (newitem->jq_prev == NULL)
|
||||||
|
head->jq_next = newitem;
|
||||||
|
else
|
||||||
|
newitem->jq_prev->jq_next = newitem;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
/* append after "item" */
|
||||||
|
newitem->jq_prev = item;
|
||||||
|
newitem->jq_next = item->jq_next;
|
||||||
|
item->jq_next = newitem;
|
||||||
|
if (newitem->jq_next == NULL)
|
||||||
|
head->jq_prev = newitem;
|
||||||
|
else
|
||||||
|
newitem->jq_next->jq_prev = newitem;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#define CH_JSON_MAX_ARGS 4
|
#define CH_JSON_MAX_ARGS 4
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -2410,11 +2478,11 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
|
|||||||
int argc = 0;
|
int argc = 0;
|
||||||
|
|
||||||
/* Get any json message in the queue. */
|
/* Get any json message in the queue. */
|
||||||
if (channel_get_json(channel, part, -1, &listtv) == FAIL)
|
if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
|
||||||
{
|
{
|
||||||
/* Parse readahead, return when there is still no message. */
|
/* Parse readahead, return when there is still no message. */
|
||||||
channel_parse_json(channel, part);
|
channel_parse_json(channel, part);
|
||||||
if (channel_get_json(channel, part, -1, &listtv) == FAIL)
|
if (channel_get_json(channel, part, -1, FALSE, &listtv) == FAIL)
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2454,7 +2522,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
|
|||||||
{
|
{
|
||||||
/* If there is a close callback it may use ch_read() to get the
|
/* If there is a close callback it may use ch_read() to get the
|
||||||
* messages. */
|
* messages. */
|
||||||
if (channel->ch_close_cb == NULL)
|
if (channel->ch_close_cb == NULL && !channel->ch_drop_never)
|
||||||
drop_messages(channel, part);
|
drop_messages(channel, part);
|
||||||
return FALSE;
|
return FALSE;
|
||||||
}
|
}
|
||||||
@ -2531,7 +2599,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
|
|||||||
{
|
{
|
||||||
int done = FALSE;
|
int done = FALSE;
|
||||||
|
|
||||||
/* invoke the one-time callback with the matching nr */
|
/* JSON or JS mode: invoke the one-time callback with the matching nr */
|
||||||
for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next)
|
for (cbitem = cbhead->cq_next; cbitem != NULL; cbitem = cbitem->cq_next)
|
||||||
if (cbitem->cq_seq_nr == seq_nr)
|
if (cbitem->cq_seq_nr == seq_nr)
|
||||||
{
|
{
|
||||||
@ -2540,7 +2608,17 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if (!done)
|
if (!done)
|
||||||
ch_logn(channel, "Dropping message %d without callback", seq_nr);
|
{
|
||||||
|
if (channel->ch_drop_never)
|
||||||
|
{
|
||||||
|
/* message must be read with ch_read() */
|
||||||
|
channel_push_json(channel, part, listtv);
|
||||||
|
listtv = NULL;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
ch_logn(channel, "Dropping message %d without callback",
|
||||||
|
seq_nr);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
else if (callback != NULL || buffer != NULL)
|
else if (callback != NULL || buffer != NULL)
|
||||||
{
|
{
|
||||||
@ -2567,7 +2645,7 @@ may_invoke_callback(channel_T *channel, ch_part_T part)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
ch_log(channel, "Dropping message");
|
ch_logn(channel, "Dropping message %d", seq_nr);
|
||||||
|
|
||||||
if (listtv != NULL)
|
if (listtv != NULL)
|
||||||
free_tv(listtv);
|
free_tv(listtv);
|
||||||
@ -2792,9 +2870,10 @@ channel_close(channel_T *channel, int invoke_close_cb)
|
|||||||
redraw_after_callback();
|
redraw_after_callback();
|
||||||
}
|
}
|
||||||
|
|
||||||
/* any remaining messages are useless now */
|
if (!channel->ch_drop_never)
|
||||||
for (part = PART_SOCK; part < PART_IN; ++part)
|
/* any remaining messages are useless now */
|
||||||
drop_messages(channel, part);
|
for (part = PART_SOCK; part < PART_IN; ++part)
|
||||||
|
drop_messages(channel, part);
|
||||||
}
|
}
|
||||||
|
|
||||||
channel->ch_nb_close_cb = NULL;
|
channel->ch_nb_close_cb = NULL;
|
||||||
@ -3091,9 +3170,9 @@ ch_close_part_on_error(
|
|||||||
channel_close_now(channel_T *channel)
|
channel_close_now(channel_T *channel)
|
||||||
{
|
{
|
||||||
ch_log(channel, "Closing channel because all readable fds are closed");
|
ch_log(channel, "Closing channel because all readable fds are closed");
|
||||||
channel_close(channel, TRUE);
|
|
||||||
if (channel->ch_nb_close_cb != NULL)
|
if (channel->ch_nb_close_cb != NULL)
|
||||||
(*channel->ch_nb_close_cb)();
|
(*channel->ch_nb_close_cb)();
|
||||||
|
channel_close(channel, TRUE);
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -3243,7 +3322,7 @@ channel_read_block(channel_T *channel, ch_part_T part, int timeout)
|
|||||||
* When "id" is -1 accept any message;
|
* When "id" is -1 accept any message;
|
||||||
* Blocks until the message is received or the timeout is reached.
|
* Blocks until the message is received or the timeout is reached.
|
||||||
*/
|
*/
|
||||||
int
|
static int
|
||||||
channel_read_json_block(
|
channel_read_json_block(
|
||||||
channel_T *channel,
|
channel_T *channel,
|
||||||
ch_part_T part,
|
ch_part_T part,
|
||||||
@ -3264,7 +3343,7 @@ channel_read_json_block(
|
|||||||
more = channel_parse_json(channel, part);
|
more = channel_parse_json(channel, part);
|
||||||
|
|
||||||
/* search for message "id" */
|
/* search for message "id" */
|
||||||
if (channel_get_json(channel, part, id, rettv) == OK)
|
if (channel_get_json(channel, part, id, TRUE, rettv) == OK)
|
||||||
{
|
{
|
||||||
chanpart->ch_block_id = 0;
|
chanpart->ch_block_id = 0;
|
||||||
return OK;
|
return OK;
|
||||||
@ -4290,6 +4369,20 @@ get_job_options(typval_T *tv, jobopt_T *opt, int supported)
|
|||||||
return FAIL;
|
return FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else if (STRCMP(hi->hi_key, "drop") == 0)
|
||||||
|
{
|
||||||
|
int never = FALSE;
|
||||||
|
val = get_tv_string(item);
|
||||||
|
|
||||||
|
if (STRCMP(val, "never") == 0)
|
||||||
|
never = TRUE;
|
||||||
|
else if (STRCMP(val, "auto") != 0)
|
||||||
|
{
|
||||||
|
EMSG2(_(e_invarg2), "drop");
|
||||||
|
return FAIL;
|
||||||
|
}
|
||||||
|
opt->jo_drop_never = never;
|
||||||
|
}
|
||||||
else if (STRCMP(hi->hi_key, "exit_cb") == 0)
|
else if (STRCMP(hi->hi_key, "exit_cb") == 0)
|
||||||
{
|
{
|
||||||
if (!(supported & JO_EXIT_CB))
|
if (!(supported & JO_EXIT_CB))
|
||||||
|
|||||||
@ -1786,7 +1786,7 @@ f_ceil(typval_T *argvars, typval_T *rettv)
|
|||||||
static void
|
static void
|
||||||
f_ch_canread(typval_T *argvars, typval_T *rettv)
|
f_ch_canread(typval_T *argvars, typval_T *rettv)
|
||||||
{
|
{
|
||||||
channel_T *channel = get_channel_arg(&argvars[0], TRUE, TRUE, 0);
|
channel_T *channel = get_channel_arg(&argvars[0], FALSE, FALSE, 0);
|
||||||
|
|
||||||
rettv->vval.v_number = 0;
|
rettv->vval.v_number = 0;
|
||||||
if (channel != NULL)
|
if (channel != NULL)
|
||||||
|
|||||||
@ -42,6 +42,9 @@ static int confirm_msg_used = FALSE; /* displaying confirm_msg */
|
|||||||
static char_u *confirm_msg = NULL; /* ":confirm" message */
|
static char_u *confirm_msg = NULL; /* ":confirm" message */
|
||||||
static char_u *confirm_msg_tail; /* tail of confirm_msg */
|
static char_u *confirm_msg_tail; /* tail of confirm_msg */
|
||||||
#endif
|
#endif
|
||||||
|
#ifdef FEAT_JOB_CHANNEL
|
||||||
|
static int emsg_to_channel_log = FALSE;
|
||||||
|
#endif
|
||||||
|
|
||||||
struct msg_hist
|
struct msg_hist
|
||||||
{
|
{
|
||||||
@ -166,6 +169,14 @@ msg_attr_keep(
|
|||||||
&& STRCMP(s, last_msg_hist->msg)))
|
&& STRCMP(s, last_msg_hist->msg)))
|
||||||
add_msg_hist(s, -1, attr);
|
add_msg_hist(s, -1, attr);
|
||||||
|
|
||||||
|
#ifdef FEAT_JOB_CHANNEL
|
||||||
|
if (emsg_to_channel_log)
|
||||||
|
{
|
||||||
|
/* Write message in the channel log. */
|
||||||
|
ch_logs(NULL, "ERROR: %s", (char *)s);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
/* When displaying keep_msg, don't let msg_start() free it, caller must do
|
/* When displaying keep_msg, don't let msg_start() free it, caller must do
|
||||||
* that. */
|
* that. */
|
||||||
if (s == keep_msg)
|
if (s == keep_msg)
|
||||||
@ -556,6 +567,7 @@ emsg(char_u *s)
|
|||||||
{
|
{
|
||||||
int attr;
|
int attr;
|
||||||
char_u *p;
|
char_u *p;
|
||||||
|
int r;
|
||||||
#ifdef FEAT_EVAL
|
#ifdef FEAT_EVAL
|
||||||
int ignore = FALSE;
|
int ignore = FALSE;
|
||||||
int severe;
|
int severe;
|
||||||
@ -624,6 +636,9 @@ emsg(char_u *s)
|
|||||||
}
|
}
|
||||||
redir_write(s, -1);
|
redir_write(s, -1);
|
||||||
}
|
}
|
||||||
|
#ifdef FEAT_JOB_CHANNEL
|
||||||
|
ch_logs(NULL, "ERROR: %s", (char *)s);
|
||||||
|
#endif
|
||||||
return TRUE;
|
return TRUE;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -650,6 +665,9 @@ emsg(char_u *s)
|
|||||||
* and a redraw is expected because
|
* and a redraw is expected because
|
||||||
* msg_scrolled is non-zero */
|
* msg_scrolled is non-zero */
|
||||||
|
|
||||||
|
#ifdef FEAT_JOB_CHANNEL
|
||||||
|
emsg_to_channel_log = TRUE;
|
||||||
|
#endif
|
||||||
/*
|
/*
|
||||||
* Display name and line number for the source of the error.
|
* Display name and line number for the source of the error.
|
||||||
*/
|
*/
|
||||||
@ -659,7 +677,12 @@ emsg(char_u *s)
|
|||||||
* Display the error message itself.
|
* Display the error message itself.
|
||||||
*/
|
*/
|
||||||
msg_nowait = FALSE; /* wait for this msg */
|
msg_nowait = FALSE; /* wait for this msg */
|
||||||
return msg_attr(s, attr);
|
r = msg_attr(s, attr);
|
||||||
|
|
||||||
|
#ifdef FEAT_JOB_CHANNEL
|
||||||
|
emsg_to_channel_log = FALSE;
|
||||||
|
#endif
|
||||||
|
return r;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -33,7 +33,6 @@ void channel_close_in(channel_T *channel);
|
|||||||
void channel_clear(channel_T *channel);
|
void channel_clear(channel_T *channel);
|
||||||
void channel_free_all(void);
|
void channel_free_all(void);
|
||||||
char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
|
char_u *channel_read_block(channel_T *channel, ch_part_T part, int timeout);
|
||||||
int channel_read_json_block(channel_T *channel, ch_part_T part, int timeout_arg, int id, typval_T **rettv);
|
|
||||||
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
|
void common_channel_read(typval_T *argvars, typval_T *rettv, int raw);
|
||||||
channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
|
channel_T *channel_fd2channel(sock_T fd, ch_part_T *partp);
|
||||||
void channel_handle_events(void);
|
void channel_handle_events(void);
|
||||||
|
|||||||
@ -1474,6 +1474,7 @@ struct jsonq_S
|
|||||||
typval_T *jq_value;
|
typval_T *jq_value;
|
||||||
jsonq_T *jq_next;
|
jsonq_T *jq_next;
|
||||||
jsonq_T *jq_prev;
|
jsonq_T *jq_prev;
|
||||||
|
int jq_no_callback; /* TRUE when no callback was found */
|
||||||
};
|
};
|
||||||
|
|
||||||
struct cbq_S
|
struct cbq_S
|
||||||
@ -1597,6 +1598,7 @@ struct channel_S {
|
|||||||
partial_T *ch_partial;
|
partial_T *ch_partial;
|
||||||
char_u *ch_close_cb; /* call when channel is closed */
|
char_u *ch_close_cb; /* call when channel is closed */
|
||||||
partial_T *ch_close_partial;
|
partial_T *ch_close_partial;
|
||||||
|
int ch_drop_never;
|
||||||
|
|
||||||
job_T *ch_job; /* Job that uses this channel; this does not
|
job_T *ch_job; /* Job that uses this channel; this does not
|
||||||
* count as a reference to avoid a circular
|
* count as a reference to avoid a circular
|
||||||
@ -1684,6 +1686,7 @@ typedef struct
|
|||||||
partial_T *jo_close_partial; /* not referenced! */
|
partial_T *jo_close_partial; /* not referenced! */
|
||||||
char_u *jo_exit_cb; /* not allocated! */
|
char_u *jo_exit_cb; /* not allocated! */
|
||||||
partial_T *jo_exit_partial; /* not referenced! */
|
partial_T *jo_exit_partial; /* not referenced! */
|
||||||
|
int jo_drop_never;
|
||||||
int jo_waittime;
|
int jo_waittime;
|
||||||
int jo_timeout;
|
int jo_timeout;
|
||||||
int jo_out_timeout;
|
int jo_out_timeout;
|
||||||
|
|||||||
@ -764,6 +764,8 @@ static char *(features[]) =
|
|||||||
|
|
||||||
static int included_patches[] =
|
static int included_patches[] =
|
||||||
{ /* Add new patch number below this line */
|
{ /* Add new patch number below this line */
|
||||||
|
/**/
|
||||||
|
107,
|
||||||
/**/
|
/**/
|
||||||
106,
|
106,
|
||||||
/**/
|
/**/
|
||||||
|
|||||||
Reference in New Issue
Block a user