from copy import copy
from functools import reduce
from pypika.enums import (
JoinType,
UnionType,
)
from pypika.terms import (
ArithmeticExpression,
EmptyCriterion,
Field,
Function,
Index,
Rollup,
Star,
Term,
Tuple,
ValueWrapper,
)
from pypika.utils import (
JoinException,
QueryException,
RollupException,
UnionException,
builder,
format_alias_sql,
format_quotes,
ignore_copy,
)
__author__ = "Timothy Heys"
__email__ = "theys@kayak.com"
[docs]class Selectable:
def __init__(self, alias):
self.alias = alias
@builder
def as_(self, alias):
self.alias = alias
[docs] def field(self, name):
return Field(name, table=self)
@property
def star(self):
return Star(self)
@ignore_copy
def __getattr__(self, name):
return self.field(name)
@ignore_copy
def __getitem__(self, name):
return self.field(name)
[docs]class AliasedQuery(Selectable):
def __init__(self, name, query=None):
super(AliasedQuery, self).__init__(alias=name)
self.name = name
self.query = query
[docs] def get_sql(self, **kwargs):
if self.query is None:
return self.name
return self.query.get_sql(**kwargs)
def __eq__(self, other):
return isinstance(other, AliasedQuery) \
and self.name == other.name
def __hash__(self):
return hash(str(self.name))
[docs]class Schema:
def __init__(self, name, parent=None):
self._name = name
self._parent = parent
def __eq__(self, other):
return isinstance(other, Schema) \
and self._name == other._name \
and self._parent == other._parent
def __ne__(self, other):
return not self.__eq__(other)
@ignore_copy
def __getattr__(self, item):
return Table(item, schema=self)
[docs] def get_sql(self, quote_char=None, **kwargs):
# FIXME escape
schema_sql = format_quotes(self._name, quote_char)
if self._parent is not None:
return '{parent}.{schema}' \
.format(parent=self._parent.get_sql(quote_char=quote_char, **kwargs),
schema=schema_sql)
return schema_sql
[docs]class Database(Schema):
@ignore_copy
def __getattr__(self, item):
return Schema(item, parent=self)
[docs]class Table(Selectable):
@staticmethod
def _init_schema(schema):
# This is a bit complicated in order to support backwards compatibility. It should probably be cleaned up for
# the next major release. Schema is accepted as a string, list/tuple, Schema instance, or None
if isinstance(schema, Schema):
return schema
if isinstance(schema, (list, tuple)):
return reduce(lambda obj, s: Schema(s, parent=obj), schema[1:], Schema(schema[0]))
if schema is not None:
return Schema(schema)
return None
def __init__(self, name, schema=None, alias=None):
super(Table, self).__init__(alias)
self._table_name = name
self._schema = self._init_schema(schema)
[docs] def get_sql(self, **kwargs):
quote_char = kwargs.get('quote_char')
# FIXME escape
table_sql = format_quotes(self._table_name, quote_char)
if self._schema is not None:
table_sql = '{schema}.{table}' \
.format(schema=self._schema.get_sql(**kwargs),
table=table_sql)
return format_alias_sql(table_sql, self.alias, **kwargs)
def __str__(self):
return self.get_sql(quote_char='"')
def __eq__(self, other):
if not isinstance(other, Table):
return False
if self._table_name != other._table_name:
return False
if self._schema != other._schema:
return False
if self.alias != other.alias:
return False
return True
def __repr__(self):
if self._schema:
return "Table('{}', schema='{}')".format(self._table_name, self._schema)
return "Table('{}')".format(self._table_name)
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(str(self))
[docs] def select(self, *terms):
"""
Perform a SELECT operation on the current table
:param terms:
Type: list[expression]
A list of terms to select. These can be any type of int, float, str, bool or Term or a Field.
:return: QueryBuilder
"""
return Query.from_(self).select(*terms)
[docs] def update(self):
"""
Perform an UPDATE operation on the current table
:return: QueryBuilder
"""
return Query.update(self)
[docs] def insert(self, *terms):
"""
Perform an INSERT operation on the current table
:param terms:
Type: list[expression]
A list of terms to select. These can be any type of int, float, str, bool or any other valid data
:return: QueryBuilder
"""
return Query.into(self).insert(*terms)
[docs]def make_tables(*names, **kwargs):
"""
Shortcut to create many tables. If `names` param is a tuple, the first
position will refer to the `_table_name` while the second will be its `alias`.
Any other data structure will be treated as a whole as the `_table_name`.
"""
tables = []
for name in names:
if isinstance(name, tuple) and len(name) == 2:
t = Table(name=name[0], alias=name[1], schema=kwargs.get('schema'))
else:
t = Table(name=name, schema=kwargs.get('schema'))
tables.append(t)
return tables
[docs]class Column:
def __init__(self, column_name, column_type=None):
self.name = column_name
self.type = column_type
[docs] def get_sql(self, **kwargs):
quote_char = kwargs.get('quote_char')
column_sql = '{name}{type}'.format(
name=format_quotes(self.name, quote_char),
type=' {}'.format(self.type) if self.type else '',
)
return column_sql
def __str__(self):
return self.get_sql(quote_char='"')
[docs]def make_columns(*names):
"""
Shortcut to create many columns. If `names` param is a tuple, the first
position will refer to the `name` while the second will be its `type`.
Any other data structure will be treated as a whole as the `name`.
"""
columns = []
for name in names:
if isinstance(name, tuple) and len(name) == 2:
column = Column(column_name=name[0], column_type=name[1])
else:
column = Column(column_name=name)
columns.append(column)
return columns
[docs]class Query:
"""
Query is the primary class and entry point in pypika. It is used to build queries iteratively using the builder
design
pattern.
This class is immutable.
"""
@classmethod
def _builder(cls):
return QueryBuilder()
[docs] @classmethod
def from_(cls, table):
"""
Query builder entry point. Initializes query building and sets the table to select from. When using this
function, the query becomes a SELECT query.
:param table:
Type: Table or str
An instance of a Table object or a string table name.
:returns QueryBuilder
"""
return cls._builder().from_(table)
[docs] @classmethod
def create_table(cls, table):
"""
Query builder entry point. Initializes query building and sets the table name to be created. When using this
function, the query becomes a CREATE statement.
:param table: An instance of a Table object or a string table name.
:return: CreateQueryBuilder
"""
return CreateQueryBuilder().create_table(table)
[docs] @classmethod
def into(cls, table):
"""
Query builder entry point. Initializes query building and sets the table to insert into. When using this
function, the query becomes an INSERT query.
:param table:
Type: Table or str
An instance of a Table object or a string table name.
:returns QueryBuilder
"""
return cls._builder().into(table)
[docs] @classmethod
def with_(cls, table, name):
return cls._builder().with_(table, name)
[docs] @classmethod
def select(cls, *terms):
"""
Query builder entry point. Initializes query building without a table and selects fields. Useful when testing
SQL functions.
:param terms:
Type: list[expression]
A list of terms to select. These can be any type of int, float, str, bool, or Term. They cannot be a Field
unless the function ``Query.from_`` is called first.
:returns QueryBuilder
"""
return cls._builder().select(*terms)
[docs] @classmethod
def update(cls, table):
"""
Query builder entry point. Initializes query building and sets the table to update. When using this
function, the query becomes an UPDATE query.
:param table:
Type: Table or str
An instance of a Table object or a string table name.
:returns QueryBuilder
"""
return cls._builder().update(table)
class _UnionQuery(Selectable, Term):
"""
A Query class wrapper for a Union query, whether DISTINCT or ALL.
Created via the functionds `Query.union` or `Query.union_all`, this class should not be instantiated directly.
"""
def __init__(self, base_query, union_query, union_type, alias=None, wrapper_cls=ValueWrapper):
super(_UnionQuery, self).__init__(alias)
self.base_query = base_query
self._unions = [(union_type, union_query)]
self._orderbys = []
self._limit = None
self._offset = None
self._wrapper_cls = wrapper_cls
@builder
def orderby(self, *fields, **kwargs):
for field in fields:
field = (Field(field, table=self.base_query._from[0])
if isinstance(field, str)
else self.base_query.wrap_constant(field))
self._orderbys.append((field, kwargs.get('order')))
@builder
def limit(self, limit):
self._limit = limit
@builder
def offset(self, offset):
self._offset = offset
@builder
def union(self, other):
self._unions.append((UnionType.distinct, other))
@builder
def union_all(self, other):
self._unions.append((UnionType.all, other))
def __add__(self, other):
return self.union(other)
def __mul__(self, other):
return self.union_all(other)
def __str__(self):
return self.get_sql()
def get_sql(self, with_alias=False, subquery=False, **kwargs):
union_template = ' UNION{type} {union}'
kwargs.setdefault('dialect', self.base_query.dialect)
# This initializes the quote char based on the base query, which could be a dialect specific query class
# This might be overridden if quote_char is set explicitly in kwargs
kwargs.setdefault('quote_char', self.base_query.QUOTE_CHAR)
base_querystring = self.base_query.get_sql(subquery=self.base_query.wrap_union_queries, **kwargs)
querystring = base_querystring
for union_type, union_query in self._unions:
union_querystring = union_query.get_sql(subquery=self.base_query.wrap_union_queries, **kwargs)
if len(self.base_query._selects) != len(union_query._selects):
raise UnionException("Queries must have an equal number of select statements in a union."
"\n\nMain Query:\n{query1}\n\nUnion Query:\n{query2}"
.format(query1=base_querystring, query2=union_querystring))
querystring += union_template.format(type=union_type.value,
union=union_querystring)
if self._orderbys:
querystring += self._orderby_sql(**kwargs)
if self._limit:
querystring += self._limit_sql()
if self._offset:
querystring += self._offset_sql()
if subquery:
querystring = '({query})'.format(query=querystring, **kwargs)
if with_alias:
return format_alias_sql(querystring, self.alias or self._table_name, **kwargs)
return querystring
def _orderby_sql(self, quote_char=None, **kwargs):
"""
Produces the ORDER BY part of the query. This is a list of fields and possibly their directionality, ASC or
DESC. The clauses are stored in the query under self._orderbys as a list of tuples containing the field and
directionality (which can be None).
If an order by field is used in the select clause, determined by a matching , then the ORDER BY clause will use
the alias, otherwise the field will be rendered as SQL.
"""
clauses = []
selected_aliases = {s.alias for s in self.base_query._selects}
for field, directionality in self._orderbys:
term = format_quotes(field.alias, quote_char) \
if field.alias and field.alias in selected_aliases \
else field.get_sql(quote_char=quote_char, **kwargs)
clauses.append('{term} {orient}'.format(term=term, orient=directionality.value)
if directionality is not None else term)
return ' ORDER BY {orderby}'.format(orderby=','.join(clauses))
def _offset_sql(self):
return " OFFSET {offset}".format(offset=self._offset)
def _limit_sql(self):
return " LIMIT {limit}".format(limit=self._limit)
[docs]class QueryBuilder(Selectable, Term):
"""
Query Builder is the main class in pypika which stores the state of a query and offers functions which allow the
state to be branched immutably.
"""
QUOTE_CHAR = '"'
SECONDARY_QUOTE_CHAR = "'"
ALIAS_QUOTE_CHAR = None
def __init__(self,
dialect=None,
wrap_union_queries=True,
wrapper_cls=ValueWrapper):
super(QueryBuilder, self).__init__(None)
self._from = []
self._insert_table = None
self._update_table = None
self._delete_from = False
self._replace = False
self._with = []
self._selects = []
self._force_indexes = []
self._columns = []
self._values = []
self._distinct = False
self._ignore = False
self._wheres = None
self._prewheres = None
self._groupbys = []
self._with_totals = False
self._havings = None
self._orderbys = []
self._joins = []
self._unions = []
self._limit = None
self._offset = None
self._updates = []
self._select_star = False
self._select_star_tables = set()
self._mysql_rollup = False
self._select_into = False
self._subquery_count = 0
self._foreign_table = False
self.dialect = dialect
self.wrap_union_queries = wrap_union_queries
self._wrapper_cls = wrapper_cls
def __copy__(self):
newone = type(self).__new__(type(self))
newone.__dict__.update(self.__dict__)
newone._select_star_tables = copy(self._select_star_tables)
newone._from = copy(self._from)
newone._with = copy(self._with)
newone._selects = copy(self._selects)
newone._columns = copy(self._columns)
newone._values = copy(self._values)
newone._groupbys = copy(self._groupbys)
newone._orderbys = copy(self._orderbys)
newone._joins = copy(self._joins)
newone._unions = copy(self._unions)
newone._updates = copy(self._updates)
return newone
@builder
def from_(self, selectable):
"""
Adds a table to the query. This function can only be called once and will raise an AttributeError if called a
second time.
:param selectable:
Type: ``Table``, ``Query``, or ``str``
When a ``str`` is passed, a table with the name matching the ``str`` value is used.
:returns
A copy of the query with the table added.
"""
self._from.append(Table(selectable) if isinstance(selectable, str) else selectable)
if isinstance(selectable, (QueryBuilder, _UnionQuery)) and selectable.alias is None:
if isinstance(selectable, QueryBuilder):
sub_query_count = selectable._subquery_count
else:
sub_query_count = 0
sub_query_count = max(self._subquery_count, sub_query_count)
selectable.alias = 'sq%d' % sub_query_count
self._subquery_count = sub_query_count + 1
@builder
def replace_table(self, current_table, new_table):
"""
Replaces all occurrences of the specified table with the new table. Useful when reusing fields across
queries.
:param current_table:
The table instance to be replaces.
:param new_table:
The table instance to replace with.
:return:
A copy of the query with the tables replaced.
"""
self._from = [new_table if table == current_table else table for table in self._from]
self._insert_table = new_table if self._insert_table else None
self._update_table = new_table if self._update_table else None
self._with = [alias_query.replace_table(current_table, new_table) for alias_query in self._with]
self._selects = [select.replace_table(current_table, new_table) for select in self._selects]
self._columns = [column.replace_table(current_table, new_table) for column in self._columns]
self._values = [[value.replace_table(current_table, new_table) for value in value_list]
for value_list
in self._values]
self._wheres = self._wheres.replace_table(current_table, new_table) if self._wheres else None
self._prewheres = self._prewheres.replace_table(current_table, new_table) if self._prewheres else None
self._groupbys = [groupby.replace_table(current_table, new_table) for groupby in self._groupbys]
self._havings = self._havings.replace_table(current_table, new_table) if self._havings else None
self._orderbys = [(orderby[0].replace_table(current_table, new_table), orderby[1])
for orderby in self._orderbys]
self._joins = [join.replace_table(current_table, new_table) for join in self._joins]
if current_table in self._select_star_tables:
self._select_star_tables.remove(current_table)
self._select_star_tables.add(new_table)
@builder
def with_(self, selectable, name):
t = AliasedQuery(name, selectable)
self._with.append(t)
@builder
def into(self, table):
if self._insert_table is not None:
raise AttributeError("'Query' object has no attribute '%s'" % 'into')
if self._selects:
self._select_into = True
self._insert_table = table if isinstance(table, Table) else Table(table)
@builder
def select(self, *terms):
for term in terms:
if isinstance(term, Field):
self._select_field(term)
elif isinstance(term, str):
self._select_field_str(term)
elif isinstance(term, (Function, ArithmeticExpression)):
self._select_other(term)
else:
self._select_other(self.wrap_constant(term, wrapper_cls=self._wrapper_cls))
@builder
def delete(self):
if self._delete_from or self._selects or self._update_table:
raise AttributeError("'Query' object has no attribute '%s'" % 'delete')
self._delete_from = True
@builder
def update(self, table):
if self._update_table is not None or self._selects or self._delete_from:
raise AttributeError("'Query' object has no attribute '%s'" % 'update')
self._update_table = table if isinstance(table, Table) else Table(table)
@builder
def columns(self, *terms):
if self._insert_table is None:
raise AttributeError("'Query' object has no attribute '%s'" % 'insert')
for term in terms:
if isinstance(term, str):
term = Field(term, table=self._insert_table)
self._columns.append(term)
@builder
def insert(self, *terms):
if self._insert_table is None:
raise AttributeError("'Query' object has no attribute '%s'" % 'insert')
if not terms:
return
else:
self._validate_terms_and_append(*terms)
self._replace = False
@builder
def replace(self, *terms):
if self._insert_table is None:
raise AttributeError("'Query' object has no attribute '%s'" % 'insert')
if not terms:
return
else:
self._validate_terms_and_append(*terms)
self._replace = True
@builder
def force_index(self, term, *terms):
for t in (term, *terms):
if isinstance(t, Index):
self._force_indexes.append(t)
elif isinstance(t, str):
self._force_indexes.append(Index(t))
@builder
def distinct(self):
self._distinct = True
@builder
def ignore(self):
self._ignore = True
@builder
def prewhere(self, criterion):
if not self._validate_table(criterion):
self._foreign_table = True
if self._prewheres:
self._prewheres &= criterion
else:
self._prewheres = criterion
@builder
def where(self, criterion):
if isinstance(criterion, EmptyCriterion):
return
if not self._validate_table(criterion):
self._foreign_table = True
if self._wheres:
self._wheres &= criterion
else:
self._wheres = criterion
@builder
def having(self, criterion):
if self._havings:
self._havings &= criterion
else:
self._havings = criterion
@builder
def groupby(self, *terms):
for term in terms:
if isinstance(term, str):
term = Field(term, table=self._from[0])
elif isinstance(term, int):
term = Field(str(term), table=self._from[0]).wrap_constant(term)
self._groupbys.append(term)
@builder
def with_totals(self):
self._with_totals = True
@builder
def rollup(self, *terms, **kwargs):
for_mysql = 'mysql' == kwargs.get('vendor')
if self._mysql_rollup:
raise AttributeError("'Query' object has no attribute '%s'" % 'rollup')
terms = [Tuple(*term) if isinstance(term, (list, tuple, set))
else term
for term in terms]
if for_mysql:
# MySQL rolls up all of the dimensions always
if not terms and not self._groupbys:
raise RollupException('At least one group is required. Call Query.groupby(term) or pass'
'as parameter to rollup.')
self._mysql_rollup = True
self._groupbys += terms
elif 0 < len(self._groupbys) and isinstance(self._groupbys[-1], Rollup):
# If a rollup was added last, then append the new terms to the previous rollup
self._groupbys[-1].args += terms
else:
self._groupbys.append(Rollup(*terms))
@builder
def orderby(self, *fields, **kwargs):
for field in fields:
field = (Field(field, table=self._from[0])
if isinstance(field, str)
else self.wrap_constant(field))
self._orderbys.append((field, kwargs.get('order')))
@builder
def join(self, item, how=JoinType.inner):
if isinstance(item, Table):
return Joiner(self, item, how, type_label='table')
elif isinstance(item, QueryBuilder):
return Joiner(self, item, how, type_label='subquery')
elif isinstance(item, AliasedQuery):
return Joiner(self, item, how, type_label='table')
raise ValueError("Cannot join on type '%s'" % type(item))
[docs] def inner_join(self, item):
return self.join(item, JoinType.inner)
[docs] def left_join(self, item):
return self.join(item, JoinType.left)
[docs] def right_join(self, item):
return self.join(item, JoinType.right)
[docs] def outer_join(self, item):
return self.join(item, JoinType.outer)
[docs] def cross_join(self, item):
return self.join(item, JoinType.cross)
@builder
def limit(self, limit):
self._limit = limit
@builder
def offset(self, offset):
self._offset = offset
@builder
def union(self, other):
return _UnionQuery(self, other, UnionType.distinct, wrapper_cls=self._wrapper_cls)
@builder
def union_all(self, other):
return _UnionQuery(self, other, UnionType.all, wrapper_cls=self._wrapper_cls)
@builder
def set(self, field, value):
field = Field(field) if not isinstance(field, Field) else field
self._updates.append((field, ValueWrapper(value)))
def __add__(self, other):
return self.union(other)
def __mul__(self, other):
return self.union_all(other)
@builder
def __getitem__(self, item):
if not isinstance(item, slice):
raise TypeError("Query' object is not subscriptable")
self._offset = item.start
self._limit = item.stop
@staticmethod
def _list_aliases(field_set, quote_char=None):
return [field.alias or field.get_sql(quote_char=quote_char)
for field in field_set]
def _select_field_str(self, term):
if 0 == len(self._from):
raise QueryException('Cannot select {term}, no FROM table specified.'
.format(term=term))
if term == '*':
self._select_star = True
self._selects = [Star()]
return
self._select_field(Field(term, table=self._from[0]))
def _select_field(self, term):
if self._select_star:
# Do not add select terms after a star is selected
return
if term.table in self._select_star_tables:
# Do not add select terms for table after a table star is selected
return
if isinstance(term, Star):
self._selects = [select
for select in self._selects
if not hasattr(select, 'table') or term.table != select.table]
self._select_star_tables.add(term.table)
self._selects.append(term)
def _select_other(self, function):
self._selects.append(function)
[docs] def fields(self):
# Don't return anything here. Subqueries have their own fields.
return []
[docs] def do_join(self, join):
base_tables = self._from + [self._update_table] + self._with
join.validate(base_tables, self._joins)
if isinstance(join.item, QueryBuilder) and join.item.alias is None:
self._tag_subquery(join.item)
table_in_query = any(isinstance(clause, Table)
and join.item in base_tables
for clause in base_tables)
if isinstance(join.item, Table) and join.item.alias is None and table_in_query:
# On the odd chance that we join the same table as the FROM table and don't set an alias
# FIXME only works once
join.item.alias = join.item._table_name + '2'
self._joins.append(join)
[docs] def is_joined(self, table):
return any(table == join.item for join in self._joins)
def _validate_table(self, term):
"""
Returns False if the term references a table not already part of the
FROM clause or JOINS and True otherwise.
"""
base_tables = self._from + [self._update_table]
for field in term.fields():
table_in_base_tables = field.table in base_tables
table_in_joins = field.table in [join.item for join in self._joins]
if field.table is not None \
and not table_in_base_tables \
and not table_in_joins \
and field.table != self._update_table:
return False
return True
def _tag_subquery(self, subquery):
subquery.alias = 'sq%d' % self._subquery_count
self._subquery_count += 1
def _validate_terms_and_append(self, *terms):
"""
Handy function for INSERT and REPLACE statements in order to check if
terms are introduced and how append them to `self._values`
"""
if not isinstance(terms[0], (list, tuple, set)):
terms = [terms]
for values in terms:
self._values.append([value
if isinstance(value, Term)
else self.wrap_constant(value)
for value in values])
def __str__(self):
return self.get_sql(dialect=self.dialect)
def __repr__(self):
return self.__str__()
def __eq__(self, other):
if not isinstance(other, QueryBuilder):
return False
if not self.alias == other.alias:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
def __hash__(self):
return hash(self.alias) + sum(hash(clause) for clause in self._from)
def _set_kwargs_defaults(self, kwargs):
kwargs.setdefault('quote_char', self.QUOTE_CHAR)
kwargs.setdefault('secondary_quote_char', self.SECONDARY_QUOTE_CHAR)
kwargs.setdefault('alias_quote_char', self.ALIAS_QUOTE_CHAR)
kwargs.setdefault('dialect', self.dialect)
[docs] def get_sql(self, with_alias=False, subquery=False, **kwargs):
self._set_kwargs_defaults(kwargs)
if not (self._selects or self._insert_table or self._delete_from or self._update_table):
return ''
if self._insert_table and not (self._selects or self._values):
return ''
if self._update_table and not self._updates:
return ''
has_joins = bool(self._joins)
has_multiple_from_clauses = 1 < len(self._from)
has_subquery_from_clause = 0 < len(self._from) and isinstance(self._from[0], QueryBuilder)
has_reference_to_foreign_table = self._foreign_table
kwargs['with_namespace'] = any([
has_joins,
has_multiple_from_clauses,
has_subquery_from_clause,
has_reference_to_foreign_table
])
if self._update_table:
querystring = self._update_sql(**kwargs)
if self._joins:
querystring += " " + " ".join(join.get_sql(**kwargs)
for join in self._joins)
querystring += self._set_sql(**kwargs)
if self._wheres:
querystring += self._where_sql(**kwargs)
if self._limit:
querystring += self._limit_sql()
return querystring
if self._delete_from:
querystring = self._delete_sql(**kwargs)
elif not self._select_into and self._insert_table:
if self._replace:
querystring = self._replace_sql(**kwargs)
else:
querystring = self._insert_sql(**kwargs)
if self._columns:
querystring += self._columns_sql(**kwargs)
if self._values:
querystring += self._values_sql(**kwargs)
return querystring
else:
querystring += ' ' + self._select_sql(**kwargs)
else:
if self._with:
querystring = self._with_sql(**kwargs)
else:
querystring = ''
querystring += self._select_sql(**kwargs)
if self._insert_table:
querystring += self._into_sql(**kwargs)
if self._from:
querystring += self._from_sql(**kwargs)
if self._force_indexes:
querystring += self._force_index_sql(**kwargs)
if self._joins:
querystring += " " + " ".join(join.get_sql(**kwargs)
for join in self._joins)
if self._prewheres:
querystring += self._prewhere_sql(**kwargs)
if self._wheres:
querystring += self._where_sql(**kwargs)
if self._groupbys:
querystring += self._group_sql(**kwargs)
if self._mysql_rollup:
querystring += self._rollup_sql()
if self._havings:
querystring += self._having_sql(**kwargs)
if self._orderbys:
querystring += self._orderby_sql(**kwargs)
if self._limit:
querystring += self._limit_sql()
if self._offset:
querystring += self._offset_sql()
if subquery:
querystring = '({query})'.format(query=querystring)
if with_alias:
return format_alias_sql(querystring, self.alias, **kwargs)
return querystring
def _with_sql(self, **kwargs):
return 'WITH ' + ','.join(
clause.name + ' AS (' + clause.get_sql(
subquery=False,
with_alias=False,
**kwargs) +
') '
for clause in self._with)
def _select_sql(self, **kwargs):
return 'SELECT {distinct}{select}'.format(
distinct='DISTINCT ' if self._distinct else '',
select=','.join(term.get_sql(with_alias=True, subquery=True, **kwargs)
for term in self._selects),
)
def _insert_sql(self, **kwargs):
return 'INSERT {ignore}INTO {table}'.format(
table=self._insert_table.get_sql(**kwargs),
ignore='IGNORE ' if self._ignore else ''
)
def _replace_sql(self, **kwargs):
return 'REPLACE INTO {table}'.format(
table=self._insert_table.get_sql(**kwargs),
)
@staticmethod
def _delete_sql(**kwargs):
return 'DELETE'
def _update_sql(self, **kwargs):
return 'UPDATE {table}'.format(
table=self._update_table.get_sql(**kwargs)
)
def _columns_sql(self, with_namespace=False, **kwargs):
"""
SQL for Columns clause for INSERT queries
:param with_namespace:
Remove from kwargs, never format the column terms with namespaces since only one table can be inserted into
"""
return ' ({columns})'.format(
columns=','.join(term.get_sql(with_namespace=False, **kwargs)
for term in self._columns)
)
def _values_sql(self, **kwargs):
return ' VALUES ({values})' \
.format(values='),('
.join(','
.join(term.get_sql(with_alias=True, subquery=True, **kwargs)
for term in row)
for row in self._values))
def _into_sql(self, **kwargs):
return ' INTO {table}'.format(
table=self._insert_table.get_sql(with_alias=False, **kwargs),
)
def _from_sql(self, with_namespace=False, **kwargs):
return ' FROM {selectable}'.format(selectable=','.join(
clause.get_sql(subquery=True, with_alias=True, **kwargs)
for clause in self._from
))
def _force_index_sql(self, **kwargs):
return ' FORCE INDEX ({indexes})'.format(indexes=','.join(
index.get_sql(**kwargs)
for index in self._force_indexes),
)
def _prewhere_sql(self, quote_char=None, **kwargs):
return ' PREWHERE {prewhere}'.format(
prewhere=self._prewheres.get_sql(quote_char=quote_char, subquery=True, **kwargs))
def _where_sql(self, quote_char=None, **kwargs):
return ' WHERE {where}'.format(where=self._wheres.get_sql(quote_char=quote_char, subquery=True, **kwargs))
def _group_sql(self, quote_char=None, alias_quote_char=None, groupby_alias=True, **kwargs):
"""
Produces the GROUP BY part of the query. This is a list of fields. The clauses are stored in the query under
self._groupbys as a list fields.
If an groupby field is used in the select clause,
determined by a matching alias, and the groupby_alias is set True
then the GROUP BY clause will use the alias,
otherwise the entire field will be rendered as SQL.
"""
clauses = []
selected_aliases = {s.alias for s in self._selects}
for field in self._groupbys:
if groupby_alias and field.alias and field.alias in selected_aliases:
clauses.append(format_quotes(field.alias, alias_quote_char or quote_char))
else:
clauses.append(field.get_sql(quote_char=quote_char, alias_quote_char=alias_quote_char, **kwargs))
sql = ' GROUP BY {groupby}'.format(groupby=','.join(clauses))
if self._with_totals:
return sql + ' WITH TOTALS'
return sql
def _orderby_sql(self, quote_char=None, alias_quote_char=None, orderby_alias=True, **kwargs):
"""
Produces the ORDER BY part of the query. This is a list of fields and possibly their directionality, ASC or
DESC. The clauses are stored in the query under self._orderbys as a list of tuples containing the field and
directionality (which can be None).
If an order by field is used in the select clause,
determined by a matching, and the orderby_alias
is set True then the ORDER BY clause will use
the alias, otherwise the field will be rendered as SQL.
"""
clauses = []
selected_aliases = {s.alias for s in self._selects}
for field, directionality in self._orderbys:
term = format_quotes(field.alias, alias_quote_char or quote_char) \
if orderby_alias and field.alias and field.alias in selected_aliases \
else field.get_sql(quote_char=quote_char, alias_quote_char=alias_quote_char, **kwargs)
clauses.append('{term} {orient}'.format(term=term, orient=directionality.value)
if directionality is not None else term)
return ' ORDER BY {orderby}'.format(orderby=','.join(clauses))
def _rollup_sql(self):
return ' WITH ROLLUP'
def _having_sql(self, quote_char=None, **kwargs):
return ' HAVING {having}'.format(having=self._havings.get_sql(quote_char=quote_char, **kwargs))
def _offset_sql(self):
return " OFFSET {offset}".format(offset=self._offset)
def _limit_sql(self):
return " LIMIT {limit}".format(limit=self._limit)
def _set_sql(self, **kwargs):
return ' SET {set}'.format(
set=','.join(
'{field}={value}'.format(
field=field.get_sql(**kwargs),
value=value.get_sql(**kwargs)) for field, value in self._updates
)
)
[docs]class Joiner:
def __init__(self, query, item, how, type_label):
self.query = query
self.item = item
self.how = how
self.type_label = type_label
[docs] def on(self, criterion, collate=None):
if criterion is None:
raise JoinException("Parameter 'criterion' is required for a "
"{type} JOIN but was not supplied.".format(type=self.type_label))
self.query.do_join(JoinOn(self.item, self.how, criterion, collate))
return self.query
[docs] def on_field(self, *fields):
if not fields:
raise JoinException("Parameter 'fields' is required for a "
"{type} JOIN but was not supplied.".format(type=self.type_label))
criterion = None
for field in fields:
consituent = Field(field, table=self.query._from[0]) == Field(field, table=self.item)
criterion = consituent if criterion is None else criterion & consituent
self.query.do_join(JoinOn(self.item, self.how, criterion))
return self.query
[docs] def using(self, *fields):
if not fields:
raise JoinException("Parameter 'fields' is required when joining with "
"a using clause but was not supplied.".format(type=self.type_label))
self.query.do_join(JoinUsing(self.item, self.how, [Field(field) for field in fields]))
return self.query
[docs] def cross(self):
"""Return cross join"""
self.query.do_join(Join(self.item, JoinType.cross))
return self.query
[docs]class Join:
def __init__(self, item, how):
self.item = item
self.how = how
[docs] def get_sql(self, **kwargs):
sql = 'JOIN {table}'.format(
table=self.item.get_sql(subquery=True, with_alias=True, **kwargs),
)
if self.how.value:
return '{type} {join}'.format(join=sql, type=self.how.value)
return sql
[docs] def validate(self, _from, _joins):
pass
@builder
def replace_table(self, current_table, new_table):
"""
Replaces all occurrences of the specified table with the new table. Useful when reusing
fields across queries.
:param current_table:
The table to be replaced.
:param new_table:
The table to replace with.
:return:
A copy of the join with the tables replaced.
"""
self.item = self.item.replace_table(current_table, new_table)
[docs]class JoinOn(Join):
def __init__(self, item, how, criteria, collate=None):
super(JoinOn, self).__init__(item, how)
self.criterion = criteria
self.collate = collate
[docs] def get_sql(self, **kwargs):
join_sql = super(JoinOn, self).get_sql(**kwargs)
return '{join} ON {criterion}{collate}'.format(
join=join_sql,
criterion=self.criterion.get_sql(subquery=True, **kwargs),
collate=" COLLATE {}".format(self.collate) if self.collate else ""
)
[docs] def validate(self, _from, _joins):
criterion_tables = set([f.table for f in self.criterion.fields()])
available_tables = (set(_from) | {join.item for join in _joins} | {self.item})
missing_tables = criterion_tables - available_tables
if missing_tables:
raise JoinException('Invalid join criterion. One field is required from the joined item and '
'another from the selected table or an existing join. Found [{tables}]'.format(
tables=', '.join(map(str, missing_tables))
))
@builder
def replace_table(self, current_table, new_table):
"""
Replaces all occurrences of the specified table with the new table. Useful when reusing
fields across queries.
:param current_table:
The table to be replaced.
:param new_table:
The table to replace with.
:return:
A copy of the join with the tables replaced.
"""
self.item = new_table if self.item == current_table else self.item
self.criterion = self.criterion.replace_table(current_table, new_table)
[docs]class JoinUsing(Join):
def __init__(self, item, how, fields):
super(JoinUsing, self).__init__(item, how)
self.fields = fields
[docs] def get_sql(self, **kwargs):
join_sql = super(JoinUsing, self).get_sql(**kwargs)
return '{join} USING ({fields})'.format(
join=join_sql,
fields=','.join(field.get_sql(**kwargs) for field in self.fields)
)
[docs] def validate(self, _from, _joins):
pass
@builder
def replace_table(self, current_table, new_table):
"""
Replaces all occurrences of the specified table with the new table. Useful when reusing
fields across queries.
:param current_table:
The table to be replaced.
:param new_table:
The table to replace with.
:return:
A copy of the join with the tables replaced.
"""
self.item = new_table if self.item == current_table else self.item
self.fields = [field.replace_table(current_table, new_table) for field in self.fields]
[docs]class CreateQueryBuilder:
"""
Query builder used to build CREATE queries.
"""
QUOTE_CHAR = '"'
SECONDARY_QUOTE_CHAR = "'"
ALIAS_QUOTE_CHAR = None
def __init__(self, dialect=None):
self._create_table = None
self._temporary = False
self._as_select = None
self._columns = []
self.dialect = dialect
def _set_kwargs_defaults(self, kwargs):
kwargs.setdefault('quote_char', self.QUOTE_CHAR)
kwargs.setdefault('secondary_quote_char', self.SECONDARY_QUOTE_CHAR)
kwargs.setdefault('dialect', self.dialect)
[docs] def get_sql(self, **kwargs):
self._set_kwargs_defaults(kwargs)
if not self._create_table:
return ''
if not self._columns and not self._as_select:
return ''
querystring = self._create_table_sql(**kwargs)
if self._columns:
querystring += self._columns_sql(**kwargs)
else:
querystring += self._as_select_sql(**kwargs)
return querystring
@builder
def create_table(self, table):
if self._create_table:
raise AttributeError("'Query' object already has attribute create_table")
self._create_table = table if isinstance(table, Table) else Table(table)
@builder
def temporary(self):
self._temporary = True
@builder
def columns(self, *columns):
if self._as_select:
raise AttributeError("'Query' object already has attribute as_select")
for column in columns:
if isinstance(column, str):
column = Column(column)
elif isinstance(column, tuple):
column = Column(column_name=column[0], column_type=column[1])
self._columns.append(column)
@builder
def as_select(self, query_builder):
if self._columns:
raise AttributeError("'Query' object already has attribute columns")
if not isinstance(query_builder, QueryBuilder):
raise TypeError("Expected 'item' to be instance of QueryBuilder")
self._as_select = query_builder
def _create_table_sql(self, **kwargs):
return 'CREATE {temporary}TABLE {table}'.format(
temporary='TEMPORARY ' if self._temporary else '',
table=self._create_table.get_sql(**kwargs),
)
def _columns_sql(self, **kwargs):
return ' ({columns})'.format(
columns=','.join(column.get_sql(**kwargs)
for column in self._columns)
)
def _as_select_sql(self, **kwargs):
return ' AS ({query})'.format(
query=self._as_select.get_sql(**kwargs),
)
def __str__(self):
return self.get_sql()
def __repr__(self):
return self.__str__()