Asynchronous Operations: Extensions

Async in and of itself is a highly useful construct that can provide possible time-saving through its cooperative multitasking infrastructure. Async is especially useful with database access and caching, web resource access, and dealing with streams.

MySQL

The async MySQL extension is similar to the mysqli extension that comes with HHVM. The async MySQL extension is primarily used for asynchronously creating connections and querying MySQL databases.

The full API contains all of the classes and methods available for accessing MySQL via async; we will only cover a few of the more common scenarios here.

The primary class for connecting to a MySQL database is AsyncMysqlConnectionPool and its primary method is the async connect.

The primary class for querying a database is AsyncMysqlConnection with the two main query methods, query and queryf, both of which are async. There is also a function to ensure that queries to be executed are safe called escapeString.

The primary class for retrieving results from a query is an abstract class called AsyncMysqlResult, which itself has two concrete subclasses called AsyncMysqlQueryResult and AsyncMysqlErrorResult. The main methods on these classes are vectorRows and mapRows, both non-async.

Here is a simple example that shows how to get a user name from a database using this extension:

<?hh // strict

namespace Hack\UserDocumentation\AsyncOps\Extensions\Examples\MySQL;

require __DIR__.'/async_mysql_connect.inc.php';

use \Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncMysql\ConnectionInfo as CI
;

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  // Get a connection pool with default options
  $pool = new \AsyncMysqlConnectionPool(array());
  // Change credentials to something that works in order to test this code
  return await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
}

async function fetch_user_name(
  \AsyncMysqlConnection $conn,
  int $user_id,
): Awaitable<?string> {
  // Your table and column may differ, of course
  $result = await $conn->queryf(
    'SELECT name from test_table WHERE userID = %d',
    $user_id,
  );
  // There shouldn't be more than one row returned for one user id
  invariant($result->numRows() === 1, 'one row exactly');
  // A vector of vector objects holding the string values of each column
  // in the query
  $vector = $result->vectorRows();
  return $vector[0][0]; // We had one column in our query
}

async function get_user_info(
  \AsyncMysqlConnection $conn,
  string $user,
): Awaitable<Vector<Map<string, ?string>>> {
  $result = await $conn->queryf(
    'SELECT * from test_table WHERE name = %s',
    $conn->escapeString($user),
  );
  // A vector of map objects holding the string values of each column
  // in the query, and the keys being the column names
  $map = $result->mapRows();
  return $map;
}

async function async_mysql_tutorial(): Awaitable<void> {
  $conn = await get_connection();
  if ($conn !== null) {
    $result = await fetch_user_name($conn, 2);
    \var_dump($result);
    $info = await get_user_info($conn, 'Fred Emmott');
    \var_dump($info is vec<_>);
    \var_dump($info[0] is dict<_, _>);
  }
}

<<__EntryPoint>>
function main(): void {
  \HH\Asio\join(async_mysql_tutorial());
}
Output
string(11) "Fred Emmott"
bool(true)
bool(true)

Connection Pools

The async MySQL extension does not support multiplexing; each concurrent query requires its own connection. However, the extension does support connection pooling.

The async MySQL extension provides a mechanism to pool connection objects so we don't have to create a new connection every time we want to make a query. The class is AsyncMysqlConnectionPool and one can be created like this:

<?hh // strict

namespace Hack\UserDocumentation\AsyncOps\Extensions\Examples\MySQLConnectionPool;

require __DIR__.'/async_mysql_connect.inc.php';

use \Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncMysql\ConnectionInfo as CI
;

function get_pool(): \AsyncMysqlConnectionPool {
  return new \AsyncMysqlConnectionPool(
    array('pool_connection_limit' => 100),
  ); // See API for more pool options
}

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  $pool = get_pool();
  $conn = await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
  return $conn;
}

async function run(): Awaitable<void> {
  $conn = await get_connection();
  \var_dump($conn);
}

<<__EntryPoint>>
function main(): void {
  \HH\Asio\join(run());
}
Output
object(AsyncMysqlConnection)#6 (0)
}

It is highly recommended that connection pools are used for MySQL connections; if for some reason we really need one, single asynchronous client, there is an AsyncMysqlClient class that provides a connect method.

MCRouter

MCRouter is a memcached protocol-routing library. To help with memcached deployment, it provides features such as connection pooling and prefix-based routing.

The async MCRouter extension is basically an async subset of the Memcached extension that is part of HHVM. The primary class is MCRouter. There are two ways to create an instance of an MCRouter object. The createSimple method takes a vector of server addresses where memcached is running. The more configurable __construct method allows for more option tweaking. After getting an object, we can use the async versions of the core memcached protocol methods like add, get and del.

Here is a simple example showing how one might get a user name from memcached:

<?hh // strict

namespace Hack\UserDocumentation\AsyncOps\Extensions\Examples\MCRouter;

require __DIR__."/../../../../vendor/hh_autoload.php"; // For wrap()

function get_mcrouter_object(): \MCRouter {
  $servers = Vector {\getenv('HHVM_TEST_MCROUTER')};
  $mc = \MCRouter::createSimple($servers);
  return $mc;
}

async function add_user_name(
  \MCRouter $mcr,
  int $id,
  string $value,
): Awaitable<void> {
  $key = 'name:'.$id;
  await $mcr->set($key, $value);
}

async function get_user_name(\MCRouter $mcr, int $user_id): Awaitable<string> {
  $key = 'name:'.$user_id;
  try {
    $res = await \HH\Asio\wrap($mcr->get($key));
    if ($res->isSucceeded()) {
      return $res->getResult();
    }
    return "";
  } catch (\MCRouterException $ex) {
    echo $ex->getKey().\PHP_EOL.$ex->getOp();
    return "";
  }
}

async function run(): Awaitable<void> {
  $mcr = get_mcrouter_object();
  await add_user_name($mcr, 1, 'Joel');
  $name = await get_user_name($mcr, 1);
  \var_dump($name); // Should print "Joel"
}
<<__EntryPoint>>
function main(): void {
  \HH\Asio\join(run());
}
Output
string(4) "Joel"

If an issue occurs when using this protocol, two possible exceptions can be thrown: MCRouterException when something goes wrong with a core option, like deleting a key; MCRouterOptionException when the provide option list can't be parsed.

cURL

Hack currently provides two async functions for cURL.

curl_multi_await

cURL provides a data transfer library for URLs. The async cURL extension provides two functions, one of which is a wrapper around the other. curl_multi_await is the async version of HHVM's curl_multi_select. It waits until there is activity on the cURL handle and when it completes, we use curl_multi_exec to process the result, just as we would in the non-async situation.

async function curl_multi_await(resource $mh, float $timeout = 1.0): Awaitable<int>;

curl_exec

The function HH\Asio\curl_exec is a wrapper around curl_multi_await. It is easy to use as we don't necessarily have to worry about resource creation since we can just pass a string URL to it.

namespace HH\Asio {
  async function curl_exec(mixed $urlOrHandle): Awaitable<string>;
}

Here is an example of getting a vector of URL contents, using a lambda expression to cut down on the code verbosity that would come with full closure syntax:

<?hh // strict

namespace Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncCurl;

function get_urls(): vec<string> {
  return vec[
    "http://example.com",
    "http://example.net",
    "http://example.org",
  ];
}

async function get_combined_contents(
  vec<string> $urls,
): Awaitable<vec<string>> {
  // Use lambda shorthand syntax here instead of full closure syntax
  $handles = \HH\Lib\Vec\map_with_key(
    $urls,
    ($idx, $url) ==> \HH\Asio\curl_exec($url),
  );
  $contents = await \HH\Lib\Vec\from_async($handles);
  echo \HH\Lib\C\count($contents)."\n";
  return $contents;
}

<<__EntryPoint>>
function main(): void {
  \HH\Asio\join(get_combined_contents(get_urls()));
}
Output
3

Streams

The async stream extension has one function, stream_await, which is functionally similar to HHVM's stream_select. It waits for a stream to enter a state (e.g., STREAM_AWAIT_READY), but without the multiplexing functionality of stream_select. We can use HH\Lib\Vec\from_async to await multiple stream handles, but the resulting combined awaitable won't be complete until all of the underlying streams have completed.

async function stream_await(resource $fp, int $events, float $timeout = 0.0): Awaitable<int>;

The following example shows how to use stream_await to write to resources:

<?hh // strict

namespace Hack\UserDocumentation\AsyncOps\Extensions\Examples\AsyncStream;
use namespace HH\Lib\Vec;

function get_resources(): vec<resource> {
  $r1 = \fopen('php://stdout', 'w');
  $r2 = \fopen('php://stdout', 'w');
  $r3 = \fopen('php://stdout', 'w');

  return vec[$r1, $r2, $r3];
}

async function write_all(vec<resource> $resources): Awaitable<void> {
  $write_single_resource = async function(resource $r) {
    $status = await \stream_await($r, \STREAM_AWAIT_WRITE, 1.0);
    if ($status === \STREAM_AWAIT_READY) {
      \fwrite($r, \str_shuffle('ABCDEF').\PHP_EOL);
    }
  };
  // You will get 3 shuffled strings, each on a separate line.
  await Vec\from_async(\array_map($write_single_resource, $resources));
}

<<__EntryPoint>>
function main(): void {
  \HH\Asio\join(write_all(get_resources()));
}
Output
CFEADB
EDBACF
CBDEFA