集成与扩展

为了方便集成与扩展, ChannelQuery 提供可定制化的能力。

内嵌到应用

引入 ChannelQuery 脚本引擎 Jar 包。

示例

val script = "..."
val paramsContext = mutable.Map[String, AnyRef]()
....
val context = mutable.Map[String, AnyRef]()
// 你的 SparkSession 对象
val rt = SparkSQL.realtime()
val ds = ChannelEngine.execute(script, context, rt, paramsContext).asInstanceOf[Dataset[Row]]
val r = ds.limit(200).toJSON.collect()
// 停止会话,销毁所有临时视图
rt.stop()

自定义JDBC数据源

默认读取 fs/data.sources.json 配置文件,根据数据源名称查找,然后创建数据源给 loadsave 语法使用。

-- demo 是已配置的数据源名称
load jdbc 'select * from orders' option ds='demo';

自定义JDBC数据源需实现 com.torchdb.framework.lang.IDataSource 接口,以下为内置数据源实现(Scala版,其它JVM语言参考自身实现接口方式):

class DefaultDataSource extends IDataSource {
    override def source(name: String): DataSource = {
        var dsName = name
        if(name.isEmpty){
            dsName = "default"
        }
        val path = FilenameUtils.concat(System.getProperty("user.dir"), "fs/data.sources.json")
        val file = new File(path)
        if (file.exists()) {
            val mapper = new ObjectMapper()
            val nodes = mapper.readTree(file)
            if (nodes.isArray) {
                val it = nodes.iterator()
                while (it.hasNext) {
                    val node = it.next()
                    if (node.get("name").asText().equalsIgnoreCase(dsName)) {
                        return DataSource(node.get("url").asText(), node.get("user").asText(),
                        node.get("password").asText(), node.get("driver").asText(),
                        node.get("queryTimeout").asInt(), node.get("fetchSize").asInt())
                    }
                }
            }
        }
        DataSource("", "", "", "", 60, 10000)
    }
}

然后在脚本执行前注入:

ChannelEngine.dataSourceImplClassName="com.torchdb.framework.lang.DefaultDataSource"

自定义管道函数(UDCF)

当前数据驱动是建构在Spark之上,所以管道函数均以 Dataset[Row] 作为参数,返回处理后的 Dataset[Row]。

自定义管道函数需实现 com.torchdb.framework.lang.IFunction 接口,以下为内置row函数实现(Scala版,其它JVM语言参考自身实现接口方式):

class RowAlias extends IFunction {
    override def call(value: AnyRef, args: List[String], context: Map[String, AnyRef]): AnyRef = {

        value match {
            case ds: Dataset[org.apache.spark.sql.Row] =>
                val rows = new java.util.ArrayList[org.apache.spark.sql.Row]()
                val results = ds.collect()
                args.foreach(arg => {
                    rows.add(results(Math.abs(arg.toInt) - 1))
                })
                if (rows.isEmpty) {
                    rows.add(results(0))
                }
                val s = context("sparkSession").asInstanceOf[SparkSession]
                return s.createDataFrame(rows, ds.schema)
            case _ =>
        }
        value
    }
}

最后,在执行脚本前注册函数

ChannelEngine.registry(funcName, funcImplClassName)