Async: Extensions

Async in and of itself is a highly useful construct that will provide possible time-saving through its cooperative multitasking infrastructure. However, we knew there would be a common set of functionality where async would be most useful: database access and caching, web resource access, and streams.

MySQL

The async MySQL extension is similar to the mysqli extension that comes with HHVM. This extension will be primarily used for asynchronously creating connections and querying MySQL databases.

The full API will contain all of the classes and methods available for accessing MySQL via async; we will cover a few of the more common scenarios here.

The primary class for connecting to a MySQL database is AsyncMysqlConnectionPool and its primary method is the async connect().

The primary class for querying a database is AsyncMysqlConnection with the two main query methods, query() and queryf(), both async. There is also a function to ensure that queries to be executed are safe called escapeString().

The primary class for retrieving results from a query is an abstract class called AsyncMysqlResult, which itself has two concrete subclasses called AsyncMysqlQueryResult and AsyncMysqlErrorResult. The main methods on these classes are vectorRows() and mapRows(), both non-async.

<?hh
class AsyncMysqlConnectionPool {
  public function __construct(array $pool_options): void;
  public static async function connect(
    string $host,
    int $port,
    string $dbname,
    string $user,
    string $password,
    int $timeout_micros = -1,
    string $extra_key = ""): Awaitable<AsyncMysqlConnection>;
  // More methods in this class, of course
}

class AsyncMysqlConnection {
  public function query(string $query, int $timeout_micros)
    : Awaitable<AsyncMysqlResult>;
  public function queryf(string $pattern, ..$args)
    : Awaitable<AsyncMysqlResult>;
  public function escapeString(string $data): string;
  // More methods in this class, of course
}

class AsyncMysqlQueryResult extends AsyncMysqlResult {
  public function vectorRows(): Vector; // vector of Vectors
  public function mapRows(): Vector; // vector of Maps
  // return db column values as Hack types instead of
  // string representations of those types.
  public function vectorRowsTyped(): Vector;
  public function mapRowsTyped(): Vector;
  // More methods in this class, of course
}

class AsyncMysqlErrorResult extends AsyncMysqlResult {
  public function failureType(): string;
  public function mysql_errno(): int;
  public function mysql_error(): string;
  // More methods in this class, of course
}

Here is a simple example that shows how to get a user name from a database with this extension.

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQL;

use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  // Get a connection pool with default options
  $pool = new \AsyncMysqlConnectionPool(array());
  // Change credentials to something that works in order to test this code
  return await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
}

async function fetch_user_name(\AsyncMysqlConnection $conn,
                               int $user_id) : Awaitable<string> {
  // Your table and column may differ, of course
  $result = await $conn->queryf(
    'SELECT name from test_table WHERE userID = %d',
    $user_id
  );
  // There shouldn't be more than one row returned for one user id
  invariant($result->numRows() === 1, 'one row exactly');
  // A vector of vector objects holding the string values of each column
  // in the query
  $vector = $result->vectorRows();
  return $vector[0][0]; // We had one column in our query
}

async function get_user_info(\AsyncMysqlConnection $conn,
                             string $user): Awaitable<Vector<Map>> {
  $result = await $conn->queryf(
    'SELECT * from test_table WHERE name = %s',
    $conn->escapeString($user)
  );
  // A vector of map objects holding the string values of each column
  // in the query, and the keys being the column names
  $map = $result->mapRows();
  return $map;
}

async function async_mysql_tutorial(): Awaitable<void> {
  $conn = await get_connection();
  if ($conn !== null) {
    $result = await fetch_user_name($conn, 2);
    var_dump($result);
    $info = await get_user_info($conn, 'Fred Emmott');
    var_dump($info instanceof Vector);
    var_dump($info[0] instanceof Map);
  }
}

\HH\Asio\join(async_mysql_tutorial());
Output
string(11) "Fred Emmott"
bool(true)
bool(true)

Connection Pools

The async MySQL extension does not support multiplexing -- each concurrent query requires its own connection. However, the extension does support connection pooling.

The async MySQL extension provides a mechanism to pool connection objects so you don't have to create a new connection every time you want to make a query. The class is AsyncMysqlConnectionPool and one can be created like this:

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MySQLConnectionPool;

use \Hack\UserDocumentation\Async\Extensions\Examples\AsyncMysql\ConnectionInfo as CI;

function get_pool(): \AsyncMysqlConnectionPool {
  return new \AsyncMysqlConnectionPool(
    array('pool_connection_limit' => 100)
  ); // See API for more pool options
}

async function get_connection(): Awaitable<\AsyncMysqlConnection> {
  $pool = get_pool();
  $conn = await $pool->connect(
    CI::$host,
    CI::$port,
    CI::$db,
    CI::$user,
    CI::$passwd,
  );
  return $conn;
}

async function run(): Awaitable<void> {
  $conn = await get_connection();
  var_dump($conn);
}

\HH\Asio\join(run());
Output
object(AsyncMysqlConnection)#6 (0) {
}

It is highly recommended that you use connection pools for your MySQL connections; if for some reason you really need one, single asynchronous client, there is an AsyncMysqlClient class that provides a connect() method.

MCRouter

MCRouter is a memcached protocol routing library. To help your memcached memcached deployment, it provides features like connection pooling, prefix-based routing, etc.

The async MCRouter extension is basically an async, yet subset, version of the Memcached extension that is part of HHVM. The primary class is MCRouter. There are two ways to create an instance of an MCRouter object. The createSimple() takes a vector of server addresses where memcached is running. The more configurable __construct() allows for more option tweaking. After getting an object, you can use the async versions of the core memcached protocol methods like add(), get() and del().

class MCRouter {
  public function __construct(array<stirng, mixed> $options, string $pid = '');
  public static function createSimple(ConstVector<string> $servers): MCRouter;
  public async function add(string $key, string $value, int $flags = 0,
                            int $expiration = 0): Awaitable<void>;
  public async function get(string $key): Awaitable<string>;
  public async function del(string $key): Awaitable<void>;
  // More methods exist, of course
}

Here is a simple example showing how one might get a user name from memcached:

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\MCRouter;

require __DIR__ . "/../../../../vendor/hh_autoload.php"; // For wrap()

function get_mcrouter_object(): \MCRouter {
  $servers = Vector { getenv('HHVM_TEST_MCROUTER') };
  $mc = \MCRouter::createSimple($servers);
  return $mc;
}

async function add_user_name(
  \MCRouter $mcr,
  int $id,
  string $value): Awaitable<void> {
  $key = 'name:' . $id;
  await $mcr->set($key, $value);
}

async function get_user_name(\MCRouter $mcr, int $user_id): Awaitable<string> {
  $key = 'name:' . $user_id;
  try {
    $res = await \HH\Asio\wrap($mcr->get($key));
    if ($res->isSucceeded()) {
      return $res->getResult();
    }
    return "";
  } catch (\MCRouterException $ex) {
    echo $ex->getKey() . PHP_EOL . $ex->getOp();
    return "";
  }
}

async function run(): Awaitable<void> {
  $mcr = get_mcrouter_object();
  await add_user_name($mcr, 1, 'Joel');
  $name = await get_user_name($mcr, 1);
  var_dump($name); // Should print "Joel"
}

\HH\Asio\join(run());
Output
string(4) "Joel"

If an issue occurs when using this protocol, two possible exceptions can be thrown. MCRouterException is thrown when something goes wrong with a core option, like deleting a key. MCRouterOptionException occurs when you provide an non-parsable option list.

cURL

Hack currently provides two async functions for cURL.

curl_multi_await

cURL provides a data transfer library for URLs. The async cURL extension provides two functions, one of which is a wrapper around the other. curl_multi_await() is the async version of HHVM's curl_multi_select(). It waits until there is activity on the cURL handle and when it completes you use curl_multi_exec() to process the result, just as you would in the non-async situation.

async function curl_multi_await(resource $mh,
                                float $timeout = 1.0): Awaitable<int>;

curl_exec

HH\Asio\curl_exec() is a wrapper around curl_multi_await(). It is easy to use as you don't necessarily have to worry about resource creation since you can just pass a string URL to it.

namespace HH\Asio {
  async function curl_exec(mixed $urlOrHandle): Awaitable<string>;
}

Here is an example of getting a vector of URL contents, using a lambda to cut down on the code verbosity that would come with full closure syntax.

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\Curl;

function get_urls(): Vector<string> {
  return Vector {
    "http://example.com",
    "http://example.net",
    "http://example.org",
  };
}

async function get_combined_contents(Vector $urls): Awaitable<Vector<string>> {
  // Use lambda shorthand syntax here instead of full closure syntax
  $handles = $urls->mapWithKey(($idx, $url) ==> \HH\Asio\curl_exec($url));
  $contents = await \HH\Asio\v($handles);
  echo $contents->count();
  return $contents;
}

\HH\Asio\join(get_combined_contents(get_urls()));
Output
3

Streams

The async stream extension has one function, stream_await(), which is functionally similar to HHVM's stream_select(). It waits for a stream to enter a state (e.g., STREAM_AWAIT_READY), but without the multiplexing functionality of stream_select(). You can use HH\Asio\v() to await multiple stream handles, but the resulting combined awaitable won't be complete until all of the underlying streams have completed.

async function stream_await(resource $fp, int $events,
                            float $timeout = 0.0): Awaitable<int>;

This example shows how you can use stream_await() to write to resources.

<?hh

namespace Hack\UserDocumentation\Async\Extensions\Examples\Stream;

function get_resources(): array<resource> {
  $r1 = fopen('php://stdout', 'w');
  $r2 = fopen('php://stdout', 'w');
  $r3 = fopen('php://stdout', 'w');

  return array($r1, $r2, $r3);
}

async function write_all(array<resource> $resources): Awaitable<void> {
  // UNSAFE : the typechecker isn't aware of stream_await until 3.12 :(
  $write_single_resource = async function(resource $r) {
    $status = await stream_await($r, STREAM_AWAIT_WRITE, 1.0);
    if ($status === STREAM_AWAIT_READY) {
      fwrite($r, str_shuffle('ABCDEF') . PHP_EOL);
    }
  };
  // You will get 3 shuffled strings, each on a separate line.
  await \HH\Asio\v(array_map($write_single_resource, $resources));
}

\HH\Asio\join(write_all(get_resources()));
Output
DCFAEB
BAFCDE
ACBDEF