example - sqlalchemy create_engine
¿Cómo escribir mi propio dialecto en sqlalchemy para adaptar HTTP API? (0)
Estoy intentando agregar una fuente de datos especial a Superset (una plataforma de exploración de datos). Esta base de datos solo es compatible con la API HTTP y devuelve datos en formato json. ex:
> http://localhost/api/sql/query?q="select * from table"
< [{"id": 1, "value":10}, {"id": 2, "value": 30} ...]
Por lo tanto, tengo que escribir mi propio adaptador en python SQLAlchemy para Superset. He leído el documento y parte del código fuente, pero aún necesito un buen ejemplo para seguir.
¿Podrías darme alguna pista? Cualquier cosa ayuda es bienvenida.
====
ACTUALIZAR:
He resuelto este problema. Esto es lo que hago.
- Vaya a
./site-packages/sqlalchemy/dialects
Copie cualquier dialecto concreto al nuevo (por ejemplo,
zeta
nombre) como punto de inicio. Una mejor manera es usarfrom sqlalchemy.engine.default import DefaultDialect class ZetaDialect(DefaultDialect): ...
Agregue
zeta
a__all__
section of./site-packages/sqlalchemy/dialects/__init__.py
Crea un programa de prueba:
from sqlalchemy import create_engine engine = create_engine(''zeta://XXX'') result = engine.execute("select * from table_name") for row in result: print(row)
Ejecútelo y obtenga un error. Usa
pdb
para encontrar el motivo. En la mayoría de los casos, el motivo es NotImplement algunas interfaces. Resuélvelo uno por uno.Cuando el programa de prueba da la respuesta correcta, casi se ha realizado al 90%. Para completar, también debemos implementar varias interfaces utilizadas por los inspectores:
class ZetaDialect(DefaultDialect): # default_paramstyle = ''qmark'' name = ''zeta'' def __init__(self, **kwargs): DefaultDialect.__init__(self, **kwargs) @classmethod def dbapi(cls): return zeta_dbapi @reflection.cache def get_table_names(self, connection, schema=None, **kw): return [u''table_1'', u''table_2'', ...] @reflection.cache def get_pk_constraint(self, connection, table_name, schema=None, **kw): return [] @reflection.cache def get_foreign_keys(self, connection, table_name, schema=None, **kw): return [] @reflection.cache def get_unique_constraints(self, connection, table_name, schema=None, **kw): return [] @reflection.cache def get_indexes(self, connection, table_name, schema=None, **kw): return [] @reflection.cache def get_schema_names(self, connection, **kw): return [] @reflection.cache def get_columns(self, connection, table_name, schema=None, **kw): # just an example of the column structure result = connection.execute(''select * from %s limit 1'' % table_name) return [{''default'': None, ''autoincrement'': False, ''type'': TEXT, ''name'': colname, ''nullable'': False} for colname, coltype in result.curs
o.description]