Skip to content

Commit

Permalink
Implement notifyd-based notification strategy for universal variables
Browse files Browse the repository at this point in the history
(OS X specific)
  • Loading branch information
ridiculousfish committed Apr 30, 2014
1 parent 08d6e51 commit 60c8012
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 12 deletions.
86 changes: 86 additions & 0 deletions env_universal_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@
#include "env_universal_common.h"
#include "path.h"

#if __APPLE__
#define FISH_NOTIFYD_AVAILABLE 1
#include <notify.h>
#endif

/**
Non-wide version of the set command
*/
Expand Down Expand Up @@ -1497,12 +1502,87 @@ class universal_notifier_shmem_poller_t : public universal_notifier_t
return usec_per_sec / 3; //3 times a second
}
}
};

class universal_notifier_notifyd_t : public universal_notifier_t
{
int notify_fd;
int token;
std::string name;

void setup_notifyd()
{
#if FISH_NOTIFYD_AVAILABLE
// per notify(3), the user.uid.%d style is only accessible to processes with that uid
char local_name[256];
snprintf(local_name, sizeof local_name, "user.uid.%d.%ls.uvars", getuid(), program_name ? program_name : L"fish");
name.assign(local_name);

uint32_t status = notify_register_file_descriptor(name.c_str(), &this->notify_fd, 0, &this->token);
if (status != NOTIFY_STATUS_OK)
{
fprintf(stderr, "Warning: notify_register_file_descriptor() failed with status %u. Universal variable notifications may not be received.", status);
}
if (this->notify_fd >= 0)
{
// Mark us for non-blocking reads, with CLO_EXEC
int flags = fcntl(this->notify_fd, F_GETFL, 0);
fcntl(this->notify_fd, F_SETFL, flags | O_NONBLOCK | FD_CLOEXEC);
}
#endif
}

public:
universal_notifier_notifyd_t() : notify_fd(-1), token(0)
{
setup_notifyd();
}

~universal_notifier_notifyd_t()
{
if (token != 0)
{
notify_cancel(token);
}
}

int notification_fd()
{
return notify_fd;
}

void drain_notification_fd(int fd)
{
/* notifyd notifications come in as 32 bit values. We don't care about the value. We set ourselves as non-blocking, so just read until we can't read any more. */
assert(fd == notify_fd);
unsigned char buff[64];
ssize_t amt_read;
do
{
amt_read = read(notify_fd, buff, sizeof buff);

} while (amt_read == sizeof buff);
}

void post_notification()
{
#if FISH_NOTIFYD_AVAILABLE
uint32_t status = notify_post(name.c_str());
if (status != NOTIFY_STATUS_OK)
{
fprintf(stderr, "Warning: notify_post() failed with status %u. Universal variable notifications may not be sent.", status);
}
#endif
}
};

universal_notifier_t::notifier_strategy_t universal_notifier_t::resolve_default_strategy()
{
#if FISH_NOTIFYD_AVAILABLE
return strategy_notifyd;
#else
return strategy_shmem_polling;
#endif
}

universal_notifier_t &universal_notifier_t::default_notifier()
Expand All @@ -1522,6 +1602,9 @@ universal_notifier_t *universal_notifier_t::new_notifier_for_strategy(universal_
case strategy_shmem_polling:
return new universal_notifier_shmem_poller_t();

case strategy_notifyd:
return new universal_notifier_notifyd_t();

default:
fprintf(stderr, "Unsupported strategy %d\n", strat);
return NULL;
Expand Down Expand Up @@ -1561,3 +1644,6 @@ unsigned long universal_notifier_t::usec_delay_between_polls() const
return 0;
}

void universal_notifier_t::drain_notification_fd(int fd)
{
}
3 changes: 3 additions & 0 deletions env_universal_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@ class universal_notifier_t

/* Recommended delay between polls. A value of 0 means no polling required (so no timeout) */
virtual unsigned long usec_delay_between_polls() const;

/* The notification_fd is readable; drain it */
virtual void drain_notification_fd(int fd);
};

std::string get_machine_identifier();
Expand Down
56 changes: 46 additions & 10 deletions fish_tests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2240,8 +2240,35 @@ static void test_universal()
putc('\n', stderr);
}

bool poll_notifier(universal_notifier_t *note)
{
bool result = false;
if (note->needs_polling())
{
result = note->poll();
}
else
{
int fd = note->notification_fd();
if (fd > 0)
{
fd_set fds;
FD_ZERO(&fds);
FD_SET(fd, &fds);
struct timeval tv = {0, 0};
if (select(fd + 1, &fds, NULL, NULL, &tv) > 0 && FD_ISSET(fd, &fds))
{
note->drain_notification_fd(fd);
result = true;
}
}
}
return result;
}

static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy_t strategy)
{
assert(strategy != universal_notifier_t::strategy_default);
say(L"Testing universal notifiers with strategy %d", (int)strategy);
universal_notifier_t *notifiers[16];
size_t notifier_count = sizeof notifiers / sizeof *notifiers;
Expand All @@ -2255,40 +2282,46 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy
// Nobody should poll yet
for (size_t i=0; i < notifier_count; i++)
{
if (notifiers[i]->poll())
if (poll_notifier(notifiers[i]))
{
err(L"Universal varibale notifier poll() returned true before any changes, with strategy %d", (int)strategy);
err(L"Universal variable notifier polled true before any changes, with strategy %d", (int)strategy);
}
}

// Tweak each notifier. Verify that others see it.
for (size_t post_idx=0; post_idx < notifier_count; post_idx++)
{
notifiers[post_idx]->post_notification();

// notifyd requires a round trip to the notifyd server, which means we have to wait a little bit to receive it
// In practice, this seems to be enough
if (strategy != universal_notifier_t::strategy_shmem_polling)
{
usleep(1000000 / 25);
}
for (size_t i=0; i < notifier_count; i++)
{
// Skip the one who posted
// We aren't concerned with the one who posted
// Poll from it (to drain it), and then skip it
if (i == post_idx)
{
poll_notifier(notifiers[i]);
continue;
}

if (notifiers[i]->needs_polling())
if (! poll_notifier(notifiers[i]))
{
if (! notifiers[i]->poll())
{
err(L"Universal varibale notifier poll() failed to notice changes, with strategy %d", (int)strategy);
}
err(L"Universal variable notifier polled failed to notice changes, with strategy %d", (int)strategy);
}
}
}

// Nobody should poll now
for (size_t i=0; i < notifier_count; i++)
{
if (notifiers[i]->poll())
if (poll_notifier(notifiers[i]))
{
err(L"Universal varibale notifier poll() returned true after all changes, with strategy %d", (int)strategy);
err(L"Universal variable notifier polled true after all changes, with strategy %d", (int)strategy);
}
}

Expand All @@ -2303,6 +2336,9 @@ static void test_notifiers_with_strategy(universal_notifier_t::notifier_strategy
static void test_universal_notifiers()
{
test_notifiers_with_strategy(universal_notifier_t::strategy_shmem_polling);
#if __APPLE__
test_notifiers_with_strategy(universal_notifier_t::strategy_notifyd);
#endif
}

class history_tests_t
Expand Down
18 changes: 16 additions & 2 deletions input_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,15 @@ static wint_t readb()
/* Get our uvar notifier */
universal_notifier_t &notifier = universal_notifier_t::default_notifier();

/* Get the suggested delay */
/* Get the notification fd (possibly none) */
int notifier_fd = notifier.notification_fd();
if (notifier_fd > 0)
{
FD_SET(notifier_fd, &fdset);
fd_max = maxi(fd_max, notifier_fd);
}

/* Get the suggested delay (possibly none) */
struct timeval tv = {};
const unsigned long usecs_delay = notifier.usec_delay_between_polls();
if (usecs_delay > 0)
Expand Down Expand Up @@ -180,9 +188,15 @@ static wint_t readb()
}
}

/* Notifiers either poll or have an fd, not both. So at most one of these branches will be taken. */
if (notifier.poll())
{
fprintf(stderr, "Change note\n");
env_universal_barrier();
}

if (notifier_fd > 0 && FD_ISSET(notifier_fd, &fdset))
{
notifier.drain_notification_fd(notifier_fd);
env_universal_barrier();
}

Expand Down

0 comments on commit 60c8012

Please sign in to comment.