Modules

Modules allow users to extend Flink’s built-in objects, such as defining functions that behave like Flink built-in functions. They are pluggable, and while Flink provides a few pre-built modules, users can write their own.

For example, users can define their own geo functions and plug them into Flink as built-in functions to be used in Flink SQL and Table APIs. Another example is users can load an out-of-shelf Hive module to use Hive built-in functions as Flink built-in functions.

Furthermore, a module can provide built-in table source and sink factories which disable Flink’s default discovery mechanism based on Java’s Service Provider Interfaces (SPI), or influence how connectors of temporary tables should be created without a corresponding catalog.

Module Types

CoreModule

CoreModule contains all of Flink’s system (built-in) functions and is loaded and enabled by default.

HiveModule

The HiveModule provides Hive built-in functions as Flink’s system functions to SQL and Table API users. Flink’s Hive documentation provides full details on setting up the module.

User-Defined Module

Users can develop custom modules by implementing the Module interface. To use custom modules in SQL CLI, users should develop both a module and its corresponding module factory by implementing the ModuleFactory interface.

A module factory defines a set of properties for configuring the module when the SQL CLI bootstraps. Properties are passed to a discovery service where the service tries to match the properties to a ModuleFactory and instantiate a corresponding module instance.

Module Lifecycle and Resolution Order

A module can be loaded, enabled, disabled and unloaded. When TableEnvironment loads a module initially, it enables the module by default. Flink supports multiple modules and keeps track of the loading order to resolve metadata. Besides, Flink only resolves the functions among enabled modules. E.g., when there are two functions of the same name residing in two modules, there will be three conditions.

  • If both of the modules are enabled, then Flink resolves the function according to the resolution order of the modules.
  • If one of them is disabled, then Flink resolves the function to the enabled module.
  • If both of the modules are disabled, then Flink cannot resolve the function.

Users can change the resolution order by using modules in a different declared order. E.g., users can specify Flink to find functions first in Hive by USE MODULES hive, core.

Besides, users can also disable modules by not declaring them. E.g., users can specify Flink to disable core module by USE MODULES hive (However, it is strongly not recommended disabling core module). Disable a module does not unload it, and users can enable it again by using it. E.g., users can bring back core module and place it in the first by USE MODULES core, hive. A module can be enabled only when it is loaded already. Using an unloaded module will throw an Exception. Eventually, users can unload a module.

The difference between disabling and unloading a module is that TableEnvironment still keeps the disabled modules, and users can list all loaded modules to view the disabled modules.

Namespace

Objects provided by modules are considered part of Flink’s system (built-in) objects; thus, they don’t have any namespaces.

How to Load, Unload, Use and List Modules

Using SQL

Users can use SQL to load/unload/use/list modules in both Table API and SQL CLI.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. // Show initially loaded and enabled modules
  4. tableEnv.executeSql("SHOW MODULES").print();
  5. // +-------------+
  6. // | module name |
  7. // +-------------+
  8. // | core |
  9. // +-------------+
  10. tableEnv.executeSql("SHOW FULL MODULES").print();
  11. // +-------------+------+
  12. // | module name | used |
  13. // +-------------+------+
  14. // | core | true |
  15. // +-------------+------+
  16. // Load a hive module
  17. tableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')");
  18. // Show all enabled modules
  19. tableEnv.executeSql("SHOW MODULES").print();
  20. // +-------------+
  21. // | module name |
  22. // +-------------+
  23. // | core |
  24. // | hive |
  25. // +-------------+
  26. // Show all loaded modules with both name and use status
  27. tableEnv.executeSql("SHOW FULL MODULES").print();
  28. // +-------------+------+
  29. // | module name | used |
  30. // +-------------+------+
  31. // | core | true |
  32. // | hive | true |
  33. // +-------------+------+
  34. // Change resolution order
  35. tableEnv.executeSql("USE MODULES hive, core");
  36. tableEnv.executeSql("SHOW MODULES").print();
  37. // +-------------+
  38. // | module name |
  39. // +-------------+
  40. // | hive |
  41. // | core |
  42. // +-------------+
  43. tableEnv.executeSql("SHOW FULL MODULES").print();
  44. // +-------------+------+
  45. // | module name | used |
  46. // +-------------+------+
  47. // | hive | true |
  48. // | core | true |
  49. // +-------------+------+
  50. // Disable core module
  51. tableEnv.executeSql("USE MODULES hive");
  52. tableEnv.executeSql("SHOW MODULES").print();
  53. // +-------------+
  54. // | module name |
  55. // +-------------+
  56. // | hive |
  57. // +-------------+
  58. tableEnv.executeSql("SHOW FULL MODULES").print();
  59. // +-------------+-------+
  60. // | module name | used |
  61. // +-------------+-------+
  62. // | hive | true |
  63. // | core | false |
  64. // +-------------+-------+
  65. // Unload hive module
  66. tableEnv.executeSql("UNLOAD MODULE hive");
  67. tableEnv.executeSql("SHOW MODULES").print();
  68. // Empty set
  69. tableEnv.executeSql("SHOW FULL MODULES").print();
  70. // +-------------+-------+
  71. // | module name | used |
  72. // +-------------+-------+
  73. // | hive | false |
  74. // +-------------+-------+

Scala

  1. val settings = EnvironmentSettings.inStreamingMode()
  2. val tableEnv = TableEnvironment.create(setting)
  3. // Show initially loaded and enabled modules
  4. tableEnv.executeSql("SHOW MODULES").print()
  5. // +-------------+
  6. // | module name |
  7. // +-------------+
  8. // | core |
  9. // +-------------+
  10. tableEnv.executeSql("SHOW FULL MODULES").print()
  11. // +-------------+------+
  12. // | module name | used |
  13. // +-------------+------+
  14. // | core | true |
  15. // +-------------+------+
  16. // Load a hive module
  17. tableEnv.executeSql("LOAD MODULE hive WITH ('hive-version' = '...')")
  18. // Show all enabled modules
  19. tableEnv.executeSql("SHOW MODULES").print()
  20. // +-------------+
  21. // | module name |
  22. // +-------------+
  23. // | core |
  24. // | hive |
  25. // +-------------+
  26. // Show all loaded modules with both name and use status
  27. tableEnv.executeSql("SHOW FULL MODULES")
  28. // +-------------+------+
  29. // | module name | used |
  30. // +-------------+------+
  31. // | core | true |
  32. // | hive | true |
  33. // +-------------+------+
  34. // Change resolution order
  35. tableEnv.executeSql("USE MODULES hive, core")
  36. tableEnv.executeSql("SHOW MODULES").print()
  37. // +-------------+
  38. // | module name |
  39. // +-------------+
  40. // | hive |
  41. // | core |
  42. // +-------------+
  43. tableEnv.executeSql("SHOW FULL MODULES").print()
  44. // +-------------+------+
  45. // | module name | used |
  46. // +-------------+------+
  47. // | hive | true |
  48. // | core | true |
  49. // +-------------+------+
  50. // Disable core module
  51. tableEnv.executeSql("USE MODULES hive")
  52. tableEnv.executeSql("SHOW MODULES").print()
  53. // +-------------+
  54. // | module name |
  55. // +-------------+
  56. // | hive |
  57. // +-------------+
  58. tableEnv.executeSql("SHOW FULL MODULES").print()
  59. // +-------------+-------+
  60. // | module name | used |
  61. // +-------------+-------+
  62. // | hive | true |
  63. // | core | false |
  64. // +-------------+-------+
  65. // Unload hive module
  66. tableEnv.executeSql("UNLOAD MODULE hive")
  67. tableEnv.executeSql("SHOW MODULES").print()
  68. // Empty set
  69. tableEnv.executeSql("SHOW FULL MODULES").print()
  70. // +-------------+-------+
  71. // | module name | used |
  72. // +-------------+-------+
  73. // | hive | false |
  74. // +-------------+-------+

Python

  1. from pyflink.table import *
  2. # environment configuration
  3. settings = EnvironmentSettings.inStreamingMode()
  4. t_env = TableEnvironment.create(settings)
  5. # Show initially loaded and enabled modules
  6. t_env.execute_sql("SHOW MODULES").print()
  7. # +-------------+
  8. # | module name |
  9. # +-------------+
  10. # | core |
  11. # +-------------+
  12. t_env.execute_sql("SHOW FULL MODULES").print()
  13. # +-------------+------+
  14. # | module name | used |
  15. # +-------------+------+
  16. # | core | true |
  17. # +-------------+------+
  18. # Load a hive module
  19. t_env.execute_sql("LOAD MODULE hive WITH ('hive-version' = '...')")
  20. # Show all enabled modules
  21. t_env.execute_sql("SHOW MODULES").print()
  22. # +-------------+
  23. # | module name |
  24. # +-------------+
  25. # | core |
  26. # | hive |
  27. # +-------------+
  28. # Show all loaded modules with both name and use status
  29. t_env.execute_sql("SHOW FULL MODULES").print()
  30. # +-------------+------+
  31. # | module name | used |
  32. # +-------------+------+
  33. # | core | true |
  34. # | hive | true |
  35. # +-------------+------+
  36. # Change resolution order
  37. t_env.execute_sql("USE MODULES hive, core")
  38. t_env.execute_sql("SHOW MODULES").print()
  39. # +-------------+
  40. # | module name |
  41. # +-------------+
  42. # | hive |
  43. # | core |
  44. # +-------------+
  45. t_env.execute_sql("SHOW FULL MODULES").print()
  46. # +-------------+------+
  47. # | module name | used |
  48. # +-------------+------+
  49. # | hive | true |
  50. # | core | true |
  51. # +-------------+------+
  52. # Disable core module
  53. t_env.execute_sql("USE MODULES hive")
  54. t_env.execute_sql("SHOW MODULES").print()
  55. # +-------------+
  56. # | module name |
  57. # +-------------+
  58. # | hive |
  59. # +-------------+
  60. t_env.execute_sql("SHOW FULL MODULES").print()
  61. # +-------------+-------+
  62. # | module name | used |
  63. # +-------------+-------+
  64. # | hive | true |
  65. # | core | false |
  66. # +-------------+-------+
  67. # Unload hive module
  68. t_env.execute_sql("UNLOAD MODULE hive")
  69. t_env.execute_sql("SHOW MODULES").print()
  70. # Empty set
  71. t_env.execute_sql("SHOW FULL MODULES").print()
  72. # +-------------+-------+
  73. # | module name | used |
  74. # +-------------+-------+
  75. # | hive | false |
  76. # +-------------+-------+

SQL Client

  1. -- Show initially loaded and enabled modules
  2. Flink SQL> SHOW MODULES;
  3. +-------------+
  4. | module name |
  5. +-------------+
  6. | core |
  7. +-------------+
  8. 1 row in set
  9. Flink SQL> SHOW FULL MODULES;
  10. +-------------+------+
  11. | module name | used |
  12. +-------------+------+
  13. | core | true |
  14. +-------------+------+
  15. 1 row in set
  16. -- Load a hive module
  17. Flink SQL> LOAD MODULE hive WITH ('hive-version' = '...');
  18. -- Show all enabled modules
  19. Flink SQL> SHOW MODULES;
  20. +-------------+
  21. | module name |
  22. +-------------+
  23. | core |
  24. | hive |
  25. +-------------+
  26. 2 rows in set
  27. -- Show all loaded modules with both name and use status
  28. Flink SQL> SHOW FULL MODULES;
  29. +-------------+------+
  30. | module name | used |
  31. +-------------+------+
  32. | core | true |
  33. | hive | true |
  34. +-------------+------+
  35. 2 rows in set
  36. -- Change resolution order
  37. Flink SQL> USE MODULES hive, core ;
  38. Flink SQL> SHOW MODULES;
  39. +-------------+
  40. | module name |
  41. +-------------+
  42. | hive |
  43. | core |
  44. +-------------+
  45. 2 rows in set
  46. Flink SQL> SHOW FULL MODULES;
  47. +-------------+------+
  48. | module name | used |
  49. +-------------+------+
  50. | hive | true |
  51. | core | true |
  52. +-------------+------+
  53. 2 rows in set
  54. -- Unload hive module
  55. Flink SQL> UNLOAD MODULE hive;
  56. Flink SQL> SHOW MODULES;
  57. Empty set
  58. Flink SQL> SHOW FULL MODULES;
  59. +-------------+-------+
  60. | module name | used |
  61. +-------------+-------+
  62. | hive | false |
  63. +-------------+-------+
  64. 1 row in set

YAML

All modules defined using YAML must provide a type property that specifies the type. The following types are supported out of the box.

ModuleType Value
CoreModulecore
HiveModulehive
  1. modules:
  2. - name: core
  3. type: core
  4. - name: hive
  5. type: hive

When using SQL, module name is used to perform the module discovery. It is parsed as a simple identifier and case-sensitive.

Using Java, Scala or Python

Users can use Java, Scala or Python to load/unload/use/list modules programmatically.

Java

  1. EnvironmentSettings settings = EnvironmentSettings.inStreamingMode();
  2. TableEnvironment tableEnv = TableEnvironment.create(settings);
  3. // Show initially loaded and enabled modules
  4. tableEnv.listModules();
  5. // +-------------+
  6. // | module name |
  7. // +-------------+
  8. // | core |
  9. // +-------------+
  10. tableEnv.listFullModules();
  11. // +-------------+------+
  12. // | module name | used |
  13. // +-------------+------+
  14. // | core | true |
  15. // +-------------+------+
  16. // Load a hive module
  17. tableEnv.loadModule("hive", new HiveModule());
  18. // Show all enabled modules
  19. tableEnv.listModules();
  20. // +-------------+
  21. // | module name |
  22. // +-------------+
  23. // | core |
  24. // | hive |
  25. // +-------------+
  26. // Show all loaded modules with both name and use status
  27. tableEnv.listFullModules();
  28. // +-------------+------+
  29. // | module name | used |
  30. // +-------------+------+
  31. // | core | true |
  32. // | hive | true |
  33. // +-------------+------+
  34. // Change resolution order
  35. tableEnv.useModules("hive", "core");
  36. tableEnv.listModules();
  37. // +-------------+
  38. // | module name |
  39. // +-------------+
  40. // | hive |
  41. // | core |
  42. // +-------------+
  43. tableEnv.listFullModules();
  44. // +-------------+------+
  45. // | module name | used |
  46. // +-------------+------+
  47. // | hive | true |
  48. // | core | true |
  49. // +-------------+------+
  50. // Disable core module
  51. tableEnv.useModules("hive");
  52. tableEnv.listModules();
  53. // +-------------+
  54. // | module name |
  55. // +-------------+
  56. // | hive |
  57. // +-------------+
  58. tableEnv.listFullModules();
  59. // +-------------+-------+
  60. // | module name | used |
  61. // +-------------+-------+
  62. // | hive | true |
  63. // | core | false |
  64. // +-------------+-------+
  65. // Unload hive module
  66. tableEnv.unloadModule("hive");
  67. tableEnv.listModules();
  68. // Empty set
  69. tableEnv.listFullModules();
  70. // +-------------+-------+
  71. // | module name | used |
  72. // +-------------+-------+
  73. // | hive | false |
  74. // +-------------+-------+

Scala

  1. val settings = EnvironmentSettings.inStreamingMode()
  2. val tableEnv = TableEnvironment.create(setting)
  3. // Show initially loaded and enabled modules
  4. tableEnv.listModules()
  5. // +-------------+
  6. // | module name |
  7. // +-------------+
  8. // | core |
  9. // +-------------+
  10. tableEnv.listFullModules()
  11. // +-------------+------+
  12. // | module name | used |
  13. // +-------------+------+
  14. // | core | true |
  15. // +-------------+------+
  16. // Load a hive module
  17. tableEnv.loadModule("hive", new HiveModule())
  18. // Show all enabled modules
  19. tableEnv.listModules()
  20. // +-------------+
  21. // | module name |
  22. // +-------------+
  23. // | core |
  24. // | hive |
  25. // +-------------+
  26. // Show all loaded modules with both name and use status
  27. tableEnv.listFullModules()
  28. // +-------------+------+
  29. // | module name | used |
  30. // +-------------+------+
  31. // | core | true |
  32. // | hive | true |
  33. // +-------------+------+
  34. // Change resolution order
  35. tableEnv.useModules("hive", "core")
  36. tableEnv.listModules()
  37. // +-------------+
  38. // | module name |
  39. // +-------------+
  40. // | hive |
  41. // | core |
  42. // +-------------+
  43. tableEnv.listFullModules()
  44. // +-------------+------+
  45. // | module name | used |
  46. // +-------------+------+
  47. // | hive | true |
  48. // | core | true |
  49. // +-------------+------+
  50. // Disable core module
  51. tableEnv.useModules("hive")
  52. tableEnv.listModules()
  53. // +-------------+
  54. // | module name |
  55. // +-------------+
  56. // | hive |
  57. // +-------------+
  58. tableEnv.listFullModules()
  59. // +-------------+-------+
  60. // | module name | used |
  61. // +-------------+-------+
  62. // | hive | true |
  63. // | core | false |
  64. // +-------------+-------+
  65. // Unload hive module
  66. tableEnv.unloadModule("hive")
  67. tableEnv.listModules()
  68. // Empty set
  69. tableEnv.listFullModules()
  70. // +-------------+-------+
  71. // | module name | used |
  72. // +-------------+-------+
  73. // | hive | false |
  74. // +-------------+-------+

Python

  1. from pyflink.table import *
  2. # environment configuration
  3. settings = EnvironmentSettings.inStreamingMode()
  4. t_env = TableEnvironment.create(settings)
  5. # Show initially loaded and enabled modules
  6. t_env.list_modules()
  7. # +-------------+
  8. # | module name |
  9. # +-------------+
  10. # | core |
  11. # +-------------+
  12. t_env.list_full_modules()
  13. # +-------------+------+
  14. # | module name | used |
  15. # +-------------+------+
  16. # | core | true |
  17. # +-------------+------+
  18. # Load a hive module
  19. t_env.load_module("hive", HiveModule())
  20. # Show all enabled modules
  21. t_env.list_modules()
  22. # +-------------+
  23. # | module name |
  24. # +-------------+
  25. # | core |
  26. # | hive |
  27. # +-------------+
  28. # Show all loaded modules with both name and use status
  29. t_env.list_full_modules()
  30. # +-------------+------+
  31. # | module name | used |
  32. # +-------------+------+
  33. # | core | true |
  34. # | hive | true |
  35. # +-------------+------+
  36. # Change resolution order
  37. t_env.use_modules("hive", "core")
  38. t_env.list_modules()
  39. # +-------------+
  40. # | module name |
  41. # +-------------+
  42. # | hive |
  43. # | core |
  44. # +-------------+
  45. t_env.list_full_modules()
  46. # +-------------+------+
  47. # | module name | used |
  48. # +-------------+------+
  49. # | hive | true |
  50. # | core | true |
  51. # +-------------+------+
  52. # Disable core module
  53. t_env.use_modules("hive")
  54. t_env.list_modules()
  55. # +-------------+
  56. # | module name |
  57. # +-------------+
  58. # | hive |
  59. # +-------------+
  60. t_env.list_full_modules()
  61. # +-------------+-------+
  62. # | module name | used |
  63. # +-------------+-------+
  64. # | hive | true |
  65. # | core | false |
  66. # +-------------+-------+
  67. # Unload hive module
  68. t_env.unload_module("hive")
  69. t_env.list_modules()
  70. # Empty set
  71. t_env.list_full_modules()
  72. # +-------------+-------+
  73. # | module name | used |
  74. # +-------------+-------+
  75. # | hive | false |
  76. # +-------------+-------+