





《从人类河流文明 洞察 数据流动的重要性》

方案一、RDS PG + OSS + HDB PG 分钟清洗和主动检测

数据通过消息队列消费后,实时写入RDS PG,在RDS PG进行订单FEED的合并,写入OSS外部表。(支持压缩格式,换算成裸数据的写入OSS的速度约100MB/s/会话)

HDB PG从OSS外部表读取(支持压缩格式,换算成裸数据的读取OSS的速度约100MB/s/数据节点),并将订单FEED数据合并到全量订单表。


《打造云端流计算、在线业务、数据分析的业务数据闭环 - 阿里云RDS、HybridDB for PostgreSQL最佳实践》

数据进入HDB PG后,通过规则SQL,从全量订单表中,挖掘异常数据(或者分析)。





实际上RDS PostgreSQL还有更强的杀手锏,可以实现毫秒级的异常FEED数据发现和反馈。






应用程序监听消息通道(listen channel),数据库则将异常数据写入到消息通道(notify channel, message)。实现异常数据的主动异步推送。


之前不做毫秒级的FEED监测,还有一个原因是HBASE的合并延迟较高,导致流计算在补齐字段时异常。使用RDS PG来实现异常监测,完全杜绝了补齐的问题,因为在RDS PG就包含了全字段,不存在补齐的需求。





  1. DB0, DB1, DB2, DB3, ..., DB255


  1. db0, host?, port?
  2. db1, host?, port?
  3. ...



  1. tbl0, tbl1, tbl2, ..., tbl127
  2. tbl128, tbl129, tbl130, ..., tbl255


  1. tbl0, db?
  2. tbl1, db?
  3. ...


HDB PG依旧保留,用于PB级数据量的海量数据实时分析。



1、创建订单feed全宽表(当然,我们也可以使用jsonb字段来存储所有属性。因为PostgreSQL支持JSONB类型哦。PostgreSQL支持的多值类型还有hstore, xml等。)

  1. create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);


使用on conflict do something语法,进行订单属性的合并。

  1. insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
  2. insert into feed (id, c3, c4) values (2,99,290001) on conflict (id) do update set c3=excluded.c3, c4=excluded.c4 ;




  1. create or replace function tg1() returns trigger as $$
  2. declare
  3. begin
  4. -- 规则定义,实际使用时,可以联合规则定义表
  5. -- c2大于1000时,发送异步消息
  6. perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(inserted)) from inserted where c2>1000;
  7. -- 多个规则,写单个notify的方法。
  8. -- perform pg_notify(
  9. -- 'channel_1',
  10. -- case
  11. -- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(inserted)
  12. -- when c1>200 then 'Resone:c1 overflow::'||row_to_json(inserted)
  13. -- end
  14. -- )
  15. -- from inserted
  16. -- where
  17. -- c2 > 1000
  18. -- or c1 > 200;
  19. -- 多个规则,可以写多个notify,或者合并成一个NOTIFY
  20. return null;
  21. end;
  22. $$ language plpgsql strict;


  1. create or replace function tg2() returns trigger as $$
  2. declare
  3. begin
  4. -- 规则定义,实际使用时,可以联合规则定义表
  5. -- c2大于9999时,发送异步消息
  6. perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;
  7. -- 多个规则,调用单个notify,写一个CHANNEL的方法。
  8. -- perform pg_notify(
  9. -- 'channel_1',
  10. -- case
  11. -- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
  12. -- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
  13. -- end
  14. -- )
  15. -- where
  16. -- NEW.c2 > 10000
  17. -- or NEW.c1 > 200;
  18. -- 多个规则,调用单个notify,写多个CHANNEL的方法。
  19. -- perform pg_notify(
  20. -- case
  21. -- when c2>1000 then 'channel_1'
  22. -- when c1>200 then 'channel_2'
  23. -- end,
  24. -- case
  25. -- when c2>1000 then 'Resone:c2 overflow::'||row_to_json(NEW)
  26. -- when c1>200 then 'Resone:c1 overflow::'||row_to_json(NEW)
  27. -- end
  28. -- )
  29. -- where
  30. -- NEW.c2 > 1000
  31. -- or NEW.c1 > 200;
  32. -- 多个规则,可以写多个notify,或者合并成一个NOTIFY
  33. -- 例如
  34. -- perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2 > 1000;
  35. -- perform pg_notify('channel_2', 'Resone:c1 overflow::'||row_to_json(NEW)) where NEW.c1 > 200;
  36. -- 也可以把规则定义在TABLE里面,实现动态的规则
  37. -- 规则不要过于冗长,否则会降低写入的吞吐,因为是串行处理规则。
  38. -- udf的输入为feed类型以及rule_table类型,输出为boolean。判断逻辑定义在UDF中。
  39. -- perfrom pg_notify(channel_column, resone_column||'::'||row_to_json(NEW)) from rule_table where udf(NEW::feed, rule_table);
  40. return null;
  41. end;
  42. $$ language plpgsql strict;




  1. create trigger tg1 after insert on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();
  2. create trigger tg2 after update on feed REFERENCING NEW TABLE AS inserted for each statement execute procedure tg1();


  1. create trigger tg1 after insert on feed for each row execute procedure tg2();
  2. create trigger tg2 after update on feed for each row execute procedure tg2();



  1. listen channel_1;
  2. 接收消息:
  3. loop
  4. sleep ?;
  5. get 消息;
  6. end loop


  1. postgres=# insert into feed (id, c1, c2) values (2,2,30001) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
  2. INSERT 0 1


  1. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2,"c1":2,"c2":30001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.


  1. postgres=# insert into feed (id, c1, c2) select id,random()*100, random()*1001 from generate_series(1,10000) t(id) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;
  2. INSERT 0 10000
  3. Time: 59.528 ms


  1. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":362,"c1":92,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  2. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4061,"c1":90,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  3. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4396,"c1":89,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  4. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5485,"c1":72,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  5. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6027,"c1":56,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  6. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":6052,"c1":91,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  7. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7893,"c1":84,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  8. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8158,"c1":73,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.


  1. postgres=# update feed set c1=1;
  2. UPDATE 10000
  3. Time: 33.444 ms


  1. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":1928,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  2. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2492,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  3. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2940,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  4. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":2981,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  5. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4271,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  6. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4539,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  7. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7089,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  8. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7619,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  9. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8001,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  10. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8511,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  11. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8774,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.
  12. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":9394,"c1":1,"c2":1001,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 38445.

压测1 - 单步实时写入


  1. vi test.sql
  2. \set id random(1,10000000)
  3. \set c1 random(1,1001)
  4. \set c2 random(1,10000)
  5. insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;

2、压测结果,16.7万 行/s 处理吞吐。

  1. transaction type: ./test.sql
  2. scaling factor: 1
  3. query mode: prepared
  4. number of clients: 56
  5. number of threads: 56
  6. duration: 120 s
  7. number of transactions actually processed: 20060111
  8. latency average = 0.335 ms
  9. latency stddev = 0.173 ms
  10. tps = 167148.009836 (including connections establishing)
  11. tps = 167190.475312 (excluding connections establishing)
  12. script statistics:
  13. - statement latencies in milliseconds:
  14. 0.002 \set id random(1,10000000)
  15. 0.001 \set c1 random(1,1001)
  16. 0.000 \set c2 random(1,10000)
  17. 0.332 insert into feed (id, c1, c2) values (:id, :c1, :c2) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2 ;


  1. postgres=# listen channel_1;
  3. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3027121,"c1":393,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 738.
  4. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5623104,"c1":177,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 758.
  5. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":3850742,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
  6. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":5244809,"c1":55,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 716.
  7. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":4062585,"c1":380,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 722.
  8. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":8536437,"c1":560,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 695.
  9. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":7327211,"c1":365,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 728.
  10. Asynchronous notification "channel_1" with payload "Resone:c2 overflow::{"id":431739,"c1":824,"c2":10000,"c3":null,"c4":null,"c5":null,"c6":null,"c7":null,"c8":null,"c9":null,"c10":null,"c11":null,"c12":null}" received from server process with PID 731.



《PostgreSQL 在铁老大订单系统中的schemaless设计和性能压测》

《PostgreSQL 按需切片的实现(TimescaleDB插件自动切片功能的plpgsql schemaless实现)》

《PostgreSQL schemaless 的实现》

《PostgreSQL 时序最佳实践 - 证券交易系统数据库设计 - 阿里云RDS PostgreSQL最佳实践》

压测2 - 单实例分表实时写入


  1. create table feed(id int8 primary key, c1 int, c2 int, c3 int, c4 int, c5 int, c6 int, c7 int, c8 int, c9 int, c10 int, c11 int, c12 int);


  1. create or replace function tg() returns trigger as $$
  2. declare
  3. begin
  4. -- c2大于9999时,发送异步消息,
  5. perform pg_notify('channel_1', 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;
  6. -- 写入各个通道的例子,通过trigger parameter传入通道后缀(也可以写入单一通道,具体看设计需求)
  7. -- perform pg_notify('channel_'||TG_ARGV[0], 'Resone:c2 overflow::'||row_to_json(NEW)) where NEW.c2>9999;
  8. return null;
  9. end;
  10. $$ language plpgsql strict;


  1. do language plpgsql $$
  2. declare
  3. begin
  4. for i in 1..512 loop
  5. execute 'create table feed'||i||'(like feed including all) inherits (feed)';
  6. -- 创建触发器(采用行级触发) 本例采用静态规则(when (...)),实际使用请使用动态规则,处理所有行
  7. execute 'create trigger tg1 after insert on feed'||i||' for each row WHEN (NEW.c2>9999) execute procedure tg()';
  8. execute 'create trigger tg2 after update on feed'||i||' for each row WHEN (NEW.c2>9999) execute procedure tg()';
  9. end loop;
  10. end;
  11. $$;



  1. create or replace function ins(int,int8,int,int) returns void as $$
  2. declare
  3. begin
  4. execute format('insert into feed%s (id,c1,c2) values (%s,%s,%s) on conflict (id) do update set c1=excluded.c1, c2=excluded.c2', $1, $2, $3, $4) ;
  5. end;
  6. $$ language plpgsql strict;


  1. create or replace function ins(int, int8) returns void as $$
  2. declare
  3. begin
  4. execute format('insert into feed%s (id,c1,c2) %s on conflict (id) do update set c1=excluded.c1, c2=excluded.c2', $1, 'select id, random()*100, random()*10000 from generate_series('||$1||','||$1+1000||') t (id)') ;
  5. end;
  6. $$ language plpgsql strict;



  1. vi test.sql
  2. \set suffix random(1,512)
  3. \set id random(1,10000000)
  4. \set c1 random(1,1001)
  5. \set c2 random(1,10000)
  6. select ins(:suffix, :id, :c1, :c2);


  1. vi test.sql
  2. \set suffix random(1,512)
  3. \set id random(1,10000000)
  4. select ins(:suffix, :id);


单条提交, 15万 行/s处理吞吐。


  1. transaction type: ./test.sql
  2. scaling factor: 1
  3. query mode: prepared
  4. number of clients: 112
  5. number of threads: 112
  6. duration: 120 s
  7. number of transactions actually processed: 18047334
  8. latency average = 0.744 ms
  9. latency stddev = 0.450 ms
  10. tps = 150264.463046 (including connections establishing)
  11. tps = 150347.026261 (excluding connections establishing)
  12. script statistics:
  13. - statement latencies in milliseconds:
  14. 0.002 \set suffix random(1,512)
  15. 0.001 \set id random(1,10000000)
  16. 0.001 \set c1 random(1,1001)
  17. 0.000 \set c2 random(1,10000)
  18. 0.742 select ins(:suffix, :id, :c1, :c2);

批量提交(1000行/批),117万 行/s处理吞吐。


  1. transaction type: ./test.sql
  2. scaling factor: 1
  3. query mode: prepared
  4. number of clients: 56
  5. number of threads: 56
  6. duration: 120 s
  7. number of transactions actually processed: 140508
  8. latency average = 47.820 ms
  9. latency stddev = 17.175 ms
  10. tps = 1169.851558 (including connections establishing)
  11. tps = 1170.150203 (excluding connections establishing)
  12. script statistics:
  13. - statement latencies in milliseconds:
  14. 0.002 \set suffix random(1,512)
  15. 0.000 \set id random(1,10000000)
  16. 47.821 select ins(:suffix, :id);

jdbc 异步消息使用例子


  1. import java.sql.*;
  2. public class NotificationTest {
  3. public static void main(String args[]) throws Exception {
  4. Class.forName("org.postgresql.Driver");
  5. String url = "jdbc:postgresql://localhost:5432/test";
  6. // Create two distinct connections, one for the notifier
  7. // and another for the listener to show the communication
  8. // works across connections although this example would
  9. // work fine with just one connection.
  10. Connection lConn = DriverManager.getConnection(url,"test","");
  11. Connection nConn = DriverManager.getConnection(url,"test","");
  12. // Create two threads, one to issue notifications and
  13. // the other to receive them.
  14. Listener listener = new Listener(lConn);
  15. Notifier notifier = new Notifier(nConn);
  16. listener.start();
  17. notifier.start();
  18. }
  19. }
  20. class Listener extends Thread {
  21. private Connection conn;
  22. private org.postgresql.PGConnection pgconn;
  23. Listener(Connection conn) throws SQLException {
  24. this.conn = conn;
  25. this.pgconn = (org.postgresql.PGConnection)conn;
  26. Statement stmt = conn.createStatement();
  27. stmt.execute("LISTEN mymessage");
  28. stmt.close();
  29. }
  30. public void run() {
  31. while (true) {
  32. try {
  33. // issue a dummy query to contact the backend
  34. // and receive any pending notifications.
  35. Statement stmt = conn.createStatement();
  36. ResultSet rs = stmt.executeQuery("SELECT 1");
  37. rs.close();
  38. stmt.close();
  39. org.postgresql.PGNotification notifications[] = pgconn.getNotifications();
  40. if (notifications != null) {
  41. for (int i=0; i<notifications.length; i++) {
  42. System.out.println("Got notification: " + notifications[i].getName());
  43. }
  44. }
  45. // wait a while before checking again for new
  46. // notifications
  47. Thread.sleep(500);
  48. } catch (SQLException sqle) {
  49. sqle.printStackTrace();
  50. } catch (InterruptedException ie) {
  51. ie.printStackTrace();
  52. }
  53. }
  54. }
  55. }
  56. class Notifier extends Thread {
  57. private Connection conn;
  58. public Notifier(Connection conn) {
  59. this.conn = conn;
  60. }
  61. public void run() {
  62. while (true) {
  63. try {
  64. Statement stmt = conn.createStatement();
  65. stmt.execute("NOTIFY mymessage");
  66. stmt.close();
  67. Thread.sleep(2000);
  68. } catch (SQLException sqle) {
  69. sqle.printStackTrace();
  70. } catch (InterruptedException ie) {
  71. ie.printStackTrace();
  72. }
  73. }
  74. }
  75. }

libpq 异步消息的使用方法




《PostgreSQL 触发器 用法详解 1》

《PostgreSQL 触发器 用法详解 2》


1、异步消息快速接收,否则会占用实例 $PGDATA/pg_notify 的目录空间。



  1. /*
  2. * The number of SLRU page buffers we use for the notification queue.
  3. */
  4. #define NUM_ASYNC_BUFFERS 8






方法:触发器内pg_notify改成insert into feedback_table ....;


  1. with t1 as (select ctid from feedback_table order by crt_time limit 100)
  2. delete from feedback_table where
  3. ctid = any (array(select ctid from t1))
  4. returning *;


只不过会消耗更多的RDS PG的IOPS,(产生写 WAL,VACUUM WAL。)


1、已推送的异常,当数据更新后,可能会被再次触发,通过在逻辑中对比OLD value和NEW value可以来规避这个问题。本文未涉及。实际使用是可以改写触发器代码。


1、RDS PostgreSQL 单实例处理吞吐达到了 117万 行/s。性价比超级棒。

2、100个RDS PostgreSQL,可以轻松达到 1亿 行/s (60亿/分钟) 的处理吞吐。宇宙无敌了。


《在PostgreSQL中实现update | delete limit - CTID扫描实践 (高效阅后即焚)》

《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》

《PostgreSQL 10.0 preview 功能增强 - 触发器函数内置中间表》




《(流式、lambda、触发器)实时处理大比拼 - 物联网(IoT)\金融,时序处理最佳实践》