集成与扩展¶
为了方便集成与扩展, ChannelQuery 提供可定制化的能力。
内嵌到应用¶
引入 ChannelQuery 脚本引擎 Jar
包。
示例¶
val script = "..."
val paramsContext = mutable.Map[String, AnyRef]()
....
val context = mutable.Map[String, AnyRef]()
// 你的 SparkSession 对象
val rt = SparkSQL.realtime()
val ds = ChannelEngine.execute(script, context, rt, paramsContext).asInstanceOf[Dataset[Row]]
val r = ds.limit(200).toJSON.collect()
// 停止会话,销毁所有临时视图
rt.stop()
自定义JDBC数据源¶
默认读取 fs/data.sources.json
配置文件,根据数据源名称查找,然后创建数据源给 load
、 save
语法使用。
-- demo 是已配置的数据源名称
load jdbc 'select * from orders' option ds='demo';
自定义JDBC数据源需实现 com.torchdb.framework.lang.IDataSource
接口,以下为内置数据源实现(Scala版,其它JVM语言参考自身实现接口方式):
class DefaultDataSource extends IDataSource {
override def source(name: String): DataSource = {
var dsName = name
if(name.isEmpty){
dsName = "default"
}
val path = FilenameUtils.concat(System.getProperty("user.dir"), "fs/data.sources.json")
val file = new File(path)
if (file.exists()) {
val mapper = new ObjectMapper()
val nodes = mapper.readTree(file)
if (nodes.isArray) {
val it = nodes.iterator()
while (it.hasNext) {
val node = it.next()
if (node.get("name").asText().equalsIgnoreCase(dsName)) {
return DataSource(node.get("url").asText(), node.get("user").asText(),
node.get("password").asText(), node.get("driver").asText(),
node.get("queryTimeout").asInt(), node.get("fetchSize").asInt())
}
}
}
}
DataSource("", "", "", "", 60, 10000)
}
}
然后在脚本执行前注入:
ChannelEngine.dataSourceImplClassName="com.torchdb.framework.lang.DefaultDataSource"
自定义管道函数(UDCF)¶
当前数据驱动是建构在Spark之上,所以管道函数均以 Dataset[Row] 作为参数,返回处理后的 Dataset[Row]。
自定义管道函数需实现 com.torchdb.framework.lang.IFunction
接口,以下为内置row函数实现(Scala版,其它JVM语言参考自身实现接口方式):
class RowAlias extends IFunction {
override def call(value: AnyRef, args: List[String], context: Map[String, AnyRef]): AnyRef = {
value match {
case ds: Dataset[org.apache.spark.sql.Row] =>
val rows = new java.util.ArrayList[org.apache.spark.sql.Row]()
val results = ds.collect()
args.foreach(arg => {
rows.add(results(Math.abs(arg.toInt) - 1))
})
if (rows.isEmpty) {
rows.add(results(0))
}
val s = context("sparkSession").asInstanceOf[SparkSession]
return s.createDataFrame(rows, ds.schema)
case _ =>
}
value
}
}
最后,在执行脚本前注册函数
ChannelEngine.registry(funcName, funcImplClassName)