Querying from Code

Overview

There are 2 ways you can interact with WeaveQ programmatically:

  1. Query API: Construct and execute queries directly from code, supplying query logic and data by using the Query API (module weaveq.query).
  2. Parser API: Run WeaveQ textual queries - stored as strings in memory - with data from custom sources by using the Parser API (module weaveq.parser).

The WeaveQ command line application is a simple wrapper on top of these APIs. It may be helpful to look at its source code on GitHub for a complete example of how to use them. You can also find in-code documentation in the repository that can be built to HTML using Doxygen.

Data Sources

As the name suggests, a data source is a WeaveQ component that retrieves data from an external source for use in join and pivot operations. They are represented by objects exposing the interface defined by weaveq.query.DataSource and invoked when WeaveQ is ready to retrieve data from the associated query steps. This interface is compatible with both the Query API and Parser API.

A DataSource object is passed a URI string and filter string on initialisation (uri and filter_string parameters to __init__). How this information is used and the form it must take is for each particular DataSource to define. Generally:

  • URIs should indicate the type and location of the data to be retrieved by the data source (for example, a web URL or filename).
  • Filter strings should specify how to selectively include/exclude data from the data source before passing it to WeaveQ.

Your data sources should be prepared to handle filter_string arguments that are None, and raise an exception of the type weaveq.wqexception.DataSourceBuildError if they can’t fall back to default behaviour. Similarly, if your data sources can’t use filter strings, they should raise an exception if passed a filter_string that is not None.

1 of 2 methods are called by WeaveQ to retrieve data from the data source: batch() or stream(). The method used is determined by the stream argument passed to weaveq.query.WeaveQ::execute() - if true, the stream() method is called; otherwise, the batch() method is called. Both must return an iterable object that provides the __getitem__ interface, such as a collections.OrderedDict or generator iterator. Data sources should indicate failure by raising a weaveq.wqexception.DataSourceError exception.

The difference in behaviour of the two methods - if any - is defined by individual data source objects.

An example toy DataSource class is illustrated below:

from collections import OrderedDict
from weaveq.query import DataSource

class ExampleDataSource(DataSource):
    def __init__(self, uri, filter_string):
        super(ExampleDataSource, self).__init__(uri, filter_string)

        self.uri = uri
        self.filter_string = filter_string

def _load_record(self, line):
    result = None

    if (self.filter_string is None):
        result = OrderedDict()
        result["field"] = line
    else:
        if (line.find(self.filter_string) != -1):
            result = collections.OrderedDict()
            result["field"] = line

    return result

def batch(self):
    results = []

    for record in self.stream():
        results.append(record)

    return results

def stream(self):
    with open(self.uri, "rb") as source:
        for line in source:
            record = self._load_record(line)
            if (record is not None):
                yield self._load_record(line)

Query API

A query is a weaveq.query.WeaveQ object. Calls to its pivot_to and join_to methods are chained to build the query steps, specifying weaveq.query.DataSource objects and field relationships as arguments.

A result handler - a callback object exposing the weaveq.query.ResultHandler interface - is assigned to the query using the result_handler() method. This object will be passed all records resulting from the query’s execution.

The query is executed by calling the execute() method.

For example:

from weaveq.query import WeaveQ
from weaveq.relations import F

d1 = ExampleDataSource(1)
d2 = ExampleDataSource(2)

q = WeaveQ(d1).pivot_to(d2, ((F("make") == F("make")) & (F("model") == F("model"))) | (F("color") == F("color")))
q.result_handler(r)
q.execute(stream=True)

The first argument to the pivot_to() method is a data source object exposing the weaveq.query.DataSource interface.

The second argument to the pivot_to() method specifies the field relationships. Fields are denoted using weaveq.relations.F objects, passing a string containing the field name to the constructor. The == and != operators can be used to associated F objects by equality and inequality, respectively. Similarly, the & and | operators can be used to AND and OR field relationship expressions - either individual relationships or sub-expressions - together.

F objects can also optionally be provided with a “field proxy”. This is a callback object to which WeaveQ passes the related field name and value in records retrieved from the data source. The proxy object can return a different value that WeaveQ will use as if it was the original. This can be useful for normalising data or mapping it to discrete values to make comparison easier.

from weaveq.relations import F, FieldProxy

class LowerCaserProxy(FieldProxy):
    def __call__(self, name, value):
        return value.lower()

field = F("make", proxy = example_proxy)

There are some syntactic restrictions when writing field expressions:

  • Always enclose them in curved brackets, i.e. (F("a") == F("b")) instead of F("a") == F("b")
  • The order of F objects being compared defines which data source they are associated with. An object on the left-hand side of the operator denotes a field in the previous step’s data source; an object on the right-hand side denotes a field in the current step’s data source.

The join_to() method works in the same way as pivot_to(), with the exception that it accepts the following additional parameters to control join step options: field (string, defining the name of the field to join to), array (boolean, specifying whether or not the field to join to should be a list) and exclude_empty_joins (boolean, specifying whether or not records that have not resulted in a join should be excluded from results).

pivot_to() and join_to() methods can be chained repeatedly:

q.pivot_to(d2, ...).join_to(d3, ...).pivot_to(d4, ...).join_to(d5, ...)

Parser API

The parser API, encapsulated in weaveq.parser.TextQuery, converts a string containing a WeaveQ text query to a weaveq.query.WeaveQ object. This eliminates the need to build the query - for example, by expressing field relationships - in code.

However, DataSource objects are still required to retrieve the data for each query step and a result handler is required to receive the query results. In order for a query step’s DataSource object to be specified from within the text query itself, rather than provide pre-built objects to the API you instead provide a data source builder. This is an object exposing the weaveq.parser.DataSourceBuilder interface - a callback that is invoked by the text query compiler to convert a data source URI and filter string into a DataSource object.

from weaveq.parser import DataSourceBuilder, TextQuery
from weaveq.wqexception import DataSourceBuildError

class ExampleDataSourceBuilder(DataSourceBuilder):
    def __call__(self, source_uri, filter_string):
        if (source_uri.endswith(".avro")):
            return ExampleAvroFileDataSource(source_uri, filter_string)
        elif (source_uri.endswith(".sqlite")):
            return ExampleSqliteFileDataSource(source_uri, filter_string)
        else:
            raise DataSourceBuildError("Unknown file type")

def run_query(query_string):
    builder = ExampleDataSourceBuilder()
    query_compiler = TextQuery(builder)
    compiled_query = query_compiler.compile_query(query_string)

    result_handler = ExampleResultHandler()
    compiled_query.result_handler(result_handler)

    compiled_query.execute()

The WeaveQ command line interface uses this API and so the same query syntax is supported, with one exception: data source URIs are not separated into a data type and a location. This is because this functionality is in WeaveQ’s own DataSourceBuilder implementation. You can reproduce it easily enough if you want to - take a look at the datasource.py source in the WeaveQ repository to see how it works.